local_tracer.c 9.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
#include <stdio.h>
#include <string.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
11
#include <inttypes.h>
12
#include <signal.h>
13

14
15
#include "T.h"
#include "T_messages.txt.h"
Cedric Roux's avatar
Cedric Roux committed
16
#include "T_defs.h"
17
#include "T_IDs.h"
18

19
static T_cache_t *T_local_cache;
20
21
22
23
24
25
26
27
28
29
static int T_busylist_head;

typedef struct databuf {
  char *d;
  int l;
  struct databuf *next;
} databuf;

typedef struct {
  int socket_local;
30
31
  volatile int socket_remote;
  int remote_port;
32
33
34
35
36
37
  pthread_mutex_t lock;
  pthread_cond_t cond;
  databuf * volatile head, *tail;
  uint64_t memusage;
  uint64_t last_warning_memusage;
} forward_data;
38

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/****************************************************************************/
/*                      utility functions                                   */
/****************************************************************************/

static void new_thread(void *(*f)(void *), void *data)
{
  pthread_t t;
  pthread_attr_t att;

  if (pthread_attr_init(&att))
    { fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
  if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
    { fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
  if (pthread_attr_setstacksize(&att, 10000000))
    { fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
  if (pthread_create(&t, &att, f, data))
    { fprintf(stderr, "pthread_create err\n"); exit(1); }
  if (pthread_attr_destroy(&att))
    { fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}

static int get_connection(char *addr, int port)
61
62
63
64
65
{
  struct sockaddr_in a;
  socklen_t alen;
  int s, t;

Cedric Roux's avatar
Cedric Roux committed
66
  printf("T tracer: waiting for connection on %s:%d\n", addr, port);
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

  s = socket(AF_INET, SOCK_STREAM, 0);
  if (s == -1) { perror("socket"); exit(1); }
  t = 1;
  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &t, sizeof(int)))
    { perror("setsockopt"); exit(1); }

  a.sin_family = AF_INET;
  a.sin_port = htons(port);
  a.sin_addr.s_addr = inet_addr(addr);

  if (bind(s, (struct sockaddr *)&a, sizeof(a))) { perror("bind"); exit(1); }
  if (listen(s, 5)) { perror("bind"); exit(1); }
  alen = sizeof(a);
  t = accept(s, (struct sockaddr *)&a, &alen);
  if (t == -1) { perror("accept"); exit(1); }
  close(s);

Cedric Roux's avatar
Cedric Roux committed
85
  printf("T tracer: connected\n");
86
87
88
89

  return t;
}

90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
static void forward(void *_forwarder, char *buf, int size);

void send_T_messages_txt(void *forwarder)
{
  char buf[T_BUFFER_MAX];
  char *T_LOCAL_buf = buf;
  int T_LOCAL_size;
  unsigned char *src;
  int src_len;

  /* trace T_message.txt
   * Send several messages -1 with content followed by message -2.
   */
  src = T_messages_txt;
  src_len = T_messages_txt_len;
  while (src_len) {
    int send_size = src_len;
    if (send_size > T_PAYLOAD_MAXSIZE - sizeof(int))
      send_size = T_PAYLOAD_MAXSIZE - sizeof(int);
    /* TODO: be careful, we use internal T stuff, to rewrite? */
    T_LOCAL_size = 0;
    T_HEADER(T_ID(-1));
    T_PUT_buffer(1, ((T_buffer){addr:(src), length:(send_size)}));
    forward(forwarder, buf, T_LOCAL_size);
    src += send_size;
    src_len -= send_size;
  }
  T_LOCAL_size = 0;
  T_HEADER(T_ID(-2));
  forward(forwarder, buf, T_LOCAL_size);
}

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/****************************************************************************/
/*                      forward functions                                   */
/****************************************************************************/

static void *data_sender(void *_f)
{
  forward_data *f = _f;
  databuf *cur;
  char *buf, *b;
  int size;

wait:
  if (pthread_mutex_lock(&f->lock)) abort();
  while (f->head == NULL)
    if (pthread_cond_wait(&f->cond, &f->lock)) abort();
  cur = f->head;
  buf = cur->d;
  size = cur->l;
  f->head = cur->next;
  f->memusage -= size;
  if (f->head == NULL) f->tail = NULL;
  if (pthread_mutex_unlock(&f->lock)) abort();
  free(cur);
  goto process;

process:
  b = buf;
149
  if (f->socket_remote != -1)
150
151
  while (size) {
    int l = write(f->socket_remote, b, size);
152
    if (l <= 0) {
Cedric Roux's avatar
Cedric Roux committed
153
      printf("T tracer: forward error\n");
154
155
156
157
      close(f->socket_remote);
      f->socket_remote = -1;
      break;
    }
158
159
160
161
162
163
164
165
166
167
168
    size -= l;
    b += l;
  }

  free(buf);

  goto wait;
}

static void *forward_remote_messages(void *_f)
{
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#define PUT(x) do { \
    if (bufsize == bufmaxsize) { \
      bufmaxsize += 4096; \
      buf = realloc(buf, bufmaxsize); \
      if (buf == NULL) abort(); \
    } \
    buf[bufsize] = x; \
    bufsize++; \
  } while (0)
#define PUT_BUF(x, l) do { \
    char *zz = (char *)(x); \
    int len = l; \
    while (len) { PUT(*zz); zz++; len--; } \
  } while (0)

184
  forward_data *f = _f;
185
186
  int from;
  int to;
187
188
  int l, len;
  char *b;
189
190
191
192
  char *buf = NULL;
  int bufsize = 0;
  int bufmaxsize = 0;
  char t;
193
194
195

again:

196
  while (1) {
197
198
    from = f->socket_remote;
    to = f->socket_local;
199

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
    bufsize = 0;

    /* let's read and process messages */
    len = read(from, &t, 1); if (len <= 0) goto dead;
    PUT(t);

    switch (t) {
    case 0:
    case 1:
      /* message 0 and 1: get a length and then 'length' numbers */
      if (read(from, &len, sizeof(int)) != sizeof(int)) goto dead;
      PUT_BUF(&len, 4);
      while (len) {
        if (read(from, &l, sizeof(int)) != sizeof(int)) goto dead;
        PUT_BUF(&l, 4);
        len--;
      }
      break;

    case 2: break;
Cedric Roux's avatar
Cedric Roux committed
220
221
222
223
    default:
      printf("%s:%d:%s: unhandled message type %d\n",
          __FILE__, __LINE__, __FUNCTION__, t);
      abort();
224
225
226
227
228
    }

    b = buf;
    while (bufsize) {
      l = write(to, b, bufsize);
229
      if (l <= 0) abort();
230
      bufsize -= l;
231
232
233
      b += l;
    }
  }
234

235
dead:
236
  /* socket died, let's stop all traces and wait for another tracer */
237
  /* TODO: be careful with those write, they might write less than wanted */
238
239
240
241
242
243
244
245
246
247
248
  buf[0] = 1;
  if (write(to, buf, 1) != 1) abort();
  len = T_NUMBER_OF_IDS;
  if (write(to, &len, sizeof(int)) != sizeof(int)) abort();
  l = 0;
  while (len) {
    if (write(to, &l, sizeof(int)) != sizeof(int)) abort();
    len--;
  };

  close(f->socket_remote);
249
  f->socket_remote = get_connection("0.0.0.0", f->remote_port);
250
  send_T_messages_txt(f);
251
252
  goto again;

253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
  return NULL;
}

static void *forwarder(int port, int s)
{
  forward_data *f;

  f = malloc(sizeof(*f)); if (f == NULL) abort();

  pthread_mutex_init(&f->lock, NULL);
  pthread_cond_init(&f->cond, NULL);

  f->socket_local = s;
  f->head = f->tail = NULL;

  f->memusage = 0;
  f->last_warning_memusage = 0;

Cedric Roux's avatar
Cedric Roux committed
271
  printf("T tracer: waiting for remote tracer on port %d\n", port);
272

273
  f->remote_port = port;
274
  f->socket_remote = get_connection("0.0.0.0", port);
275
  send_T_messages_txt(f);
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302

  new_thread(data_sender, f);
  new_thread(forward_remote_messages, f);

  return f;
}

static void forward(void *_forwarder, char *buf, int size)
{
  forward_data *f = _forwarder;
  int32_t ssize = size;
  databuf *new;

  new = malloc(sizeof(*new)); if (new == NULL) abort();

  if (pthread_mutex_lock(&f->lock)) abort();

  new->d = malloc(size + 4); if (new->d == NULL) abort();
  /* put the size of the message at the head */
  memcpy(new->d, &ssize, 4);
  memcpy(new->d+4, buf, size);
  new->l = size+4;
  new->next = NULL;
  if (f->head == NULL) f->head = new;
  if (f->tail != NULL) f->tail->next = new;
  f->tail = new;

Cedric Roux's avatar
Cedric Roux committed
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#if BASIC_SIMULATOR
  /* When runnng the basic simulator, the tracer may be too slow.
   * Let's not take too much memory in the tracee and
   * wait if there is too much data to send. 200MB is
   * arbitrary.
   */
  while (f->memusage > 200 * 1024 * 1024) {
    if (pthread_cond_signal(&f->cond)) abort();
    if (pthread_mutex_unlock(&f->lock)) abort();
    usleep(1000);
    if (pthread_mutex_lock(&f->lock)) abort();
  }
#endif /* BASIC_SIMULATOR */

317
318
319
320
321
  f->memusage += size+4;
  /* warn every 100MB */
  if (f->memusage > f->last_warning_memusage &&
      f->memusage - f->last_warning_memusage > 100000000) {
    f->last_warning_memusage += 100000000;
Cedric Roux's avatar
Cedric Roux committed
322
    printf("T tracer: WARNING: memory usage is over %"PRIu64"MB\n",
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
           f->last_warning_memusage / 1000000);
  } else
  if (f->memusage < f->last_warning_memusage &&
      f->last_warning_memusage - f->memusage > 100000000) {
    f->last_warning_memusage = (f->memusage/100000000) * 100000000;
  }

  if (pthread_cond_signal(&f->cond)) abort();
  if (pthread_mutex_unlock(&f->lock)) abort();
}

/****************************************************************************/
/*                      local functions                                     */
/****************************************************************************/

static void wait_message(void)
339
{
Cedric Roux's avatar
Cedric Roux committed
340
  while ((T_local_cache[T_busylist_head].busy & 0x02) == 0) usleep(1000);
341
342
}

Cedric Roux's avatar
Cedric Roux committed
343
void T_local_tracer_main(int remote_port, int wait_for_tracer,
Cedric Roux's avatar
Cedric Roux committed
344
    int local_socket, void *shm_array)
345
346
{
  int s;
Cedric Roux's avatar
Cedric Roux committed
347
348
  int port = remote_port;
  int dont_wait = wait_for_tracer ? 0 : 1;
349
  void *f;
Cedric Roux's avatar
Cedric Roux committed
350

351
  /* write on a socket fails if the other end is closed and we get SIGPIPE */
Cedric Roux's avatar
Cedric Roux committed
352
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
353
354
       printf("local tracer received SIGPIPE\n");
       abort();
Cedric Roux's avatar
Cedric Roux committed
355
356
357
  }

  T_local_cache = shm_array;
358

Cedric Roux's avatar
Cedric Roux committed
359
  s = local_socket;
Cedric Roux's avatar
Cedric Roux committed
360

361
362
363
364
  if (dont_wait) {
    char t = 2;
    if (write(s, &t, 1) != 1) abort();
  }
Cedric Roux's avatar
Cedric Roux committed
365

366
  f = forwarder(port, s);
Cedric Roux's avatar
Cedric Roux committed
367

368
369
370
371
  /* read messages */
  while (1) {
    wait_message();
    __sync_synchronize();
372
373
374
    forward(f, T_local_cache[T_busylist_head].buffer,
            T_local_cache[T_busylist_head].length);
    T_local_cache[T_busylist_head].busy = 0;
375
376
377
378
    T_busylist_head++;
    T_busylist_head &= T_CACHE_SIZE - 1;
  }
}