local_tracer.c 9.55 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
#include <stdio.h>
#include <string.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
11
#include <inttypes.h>
12
#include <signal.h>
13

14
15
#include "T.h"
#include "T_messages.txt.h"
Cedric Roux's avatar
Cedric Roux committed
16
#include "T_defs.h"
17
#include "T_IDs.h"
18

19
static T_cache_t *T_local_cache;
20
21
22
23
24
25
26
27
28
29
static int T_busylist_head;

typedef struct databuf {
  char *d;
  int l;
  struct databuf *next;
} databuf;

typedef struct {
  int socket_local;
30
31
  volatile int socket_remote;
  int remote_port;
32
33
  pthread_mutex_t lock;
  pthread_cond_t cond;
frtabu's avatar
frtabu committed
34
  databuf *volatile head, *tail;
35
36
37
  uint64_t memusage;
  uint64_t last_warning_memusage;
} forward_data;
38

39
40
41
42
/****************************************************************************/
/*                      utility functions                                   */
/****************************************************************************/

frtabu's avatar
frtabu committed
43
static void new_thread(void *(*f)(void *), void *data) {
44
45
46
  pthread_t t;
  pthread_attr_t att;

frtabu's avatar
frtabu committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  if (pthread_attr_init(&att)) {
    fprintf(stderr, "pthread_attr_init err\n");
    exit(1);
  }

  if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED)) {
    fprintf(stderr, "pthread_attr_setdetachstate err\n");
    exit(1);
  }

  if (pthread_attr_setstacksize(&att, 10000000)) {
    fprintf(stderr, "pthread_attr_setstacksize err\n");
    exit(1);
  }

  if (pthread_create(&t, &att, f, data)) {
    fprintf(stderr, "pthread_create err\n");
    exit(1);
  }

  if (pthread_attr_destroy(&att)) {
    fprintf(stderr, "pthread_attr_destroy err\n");
    exit(1);
  }
71
72
}

frtabu's avatar
frtabu committed
73
static int get_connection(char *addr, int port) {
74
75
76
  struct sockaddr_in a;
  socklen_t alen;
  int s, t;
Cedric Roux's avatar
Cedric Roux committed
77
  printf("T tracer: waiting for connection on %s:%d\n", addr, port);
78
  s = socket(AF_INET, SOCK_STREAM, 0);
frtabu's avatar
frtabu committed
79
80
81
82
83
84

  if (s == -1) {
    perror("socket");
    exit(1);
  }

85
  t = 1;
frtabu's avatar
frtabu committed
86
87
88
89
90

  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &t, sizeof(int))) {
    perror("setsockopt");
    exit(1);
  }
91
92
93
94
95

  a.sin_family = AF_INET;
  a.sin_port = htons(port);
  a.sin_addr.s_addr = inet_addr(addr);

frtabu's avatar
frtabu committed
96
97
98
99
100
101
102
103
104
105
  if (bind(s, (struct sockaddr *)&a, sizeof(a))) {
    perror("bind");
    exit(1);
  }

  if (listen(s, 5)) {
    perror("bind");
    exit(1);
  }

106
107
108
  alen = sizeof(a);
  t = accept(s, (struct sockaddr *)&a, &alen);

frtabu's avatar
frtabu committed
109
110
111
112
  if (t == -1) {
    perror("accept");
    exit(1);
  }
113

frtabu's avatar
frtabu committed
114
115
  close(s);
  printf("T tracer: connected\n");
116
117
118
  return t;
}

119
120
static void forward(void *_forwarder, char *buf, int size);

frtabu's avatar
frtabu committed
121
void send_T_messages_txt(void *forwarder) {
122
123
124
125
126
127
128
129
130
131
  char buf[T_BUFFER_MAX];
  char *T_LOCAL_buf = buf;
  int T_LOCAL_size;
  unsigned char *src;
  int src_len;
  /* trace T_message.txt
   * Send several messages -1 with content followed by message -2.
   */
  src = T_messages_txt;
  src_len = T_messages_txt_len;
frtabu's avatar
frtabu committed
132

133
134
  while (src_len) {
    int send_size = src_len;
frtabu's avatar
frtabu committed
135

136
137
    if (send_size > T_PAYLOAD_MAXSIZE - sizeof(int))
      send_size = T_PAYLOAD_MAXSIZE - sizeof(int);
frtabu's avatar
frtabu committed
138

139
140
141
    /* TODO: be careful, we use internal T stuff, to rewrite? */
    T_LOCAL_size = 0;
    T_HEADER(T_ID(-1));
frtabu's avatar
frtabu committed
142
143
144
145
146
    T_PUT_buffer(1, ((T_buffer) {
addr:
(src), length:
      (send_size)
    }));
147
148
149
150
    forward(forwarder, buf, T_LOCAL_size);
    src += send_size;
    src_len -= send_size;
  }
frtabu's avatar
frtabu committed
151

152
153
154
155
156
  T_LOCAL_size = 0;
  T_HEADER(T_ID(-2));
  forward(forwarder, buf, T_LOCAL_size);
}

157
158
159
160
/****************************************************************************/
/*                      forward functions                                   */
/****************************************************************************/

frtabu's avatar
frtabu committed
161
static void *data_sender(void *_f) {
162
163
164
165
166
  forward_data *f = _f;
  databuf *cur;
  char *buf, *b;
  int size;
wait:
frtabu's avatar
frtabu committed
167

168
  if (pthread_mutex_lock(&f->lock)) abort();
frtabu's avatar
frtabu committed
169

170
171
  while (f->head == NULL)
    if (pthread_cond_wait(&f->cond, &f->lock)) abort();
frtabu's avatar
frtabu committed
172

173
174
175
176
177
  cur = f->head;
  buf = cur->d;
  size = cur->l;
  f->head = cur->next;
  f->memusage -= size;
frtabu's avatar
frtabu committed
178

179
  if (f->head == NULL) f->tail = NULL;
frtabu's avatar
frtabu committed
180

181
  if (pthread_mutex_unlock(&f->lock)) abort();
frtabu's avatar
frtabu committed
182

183
184
185
186
  free(cur);
  goto process;
process:
  b = buf;
frtabu's avatar
frtabu committed
187

188
  if (f->socket_remote != -1)
frtabu's avatar
frtabu committed
189
190
191
192
193
194
195
196
197
198
199
200
    while (size) {
      int l = write(f->socket_remote, b, size);

      if (l <= 0) {
        printf("T tracer: forward error\n");
        close(f->socket_remote);
        f->socket_remote = -1;
        break;
      }

      size -= l;
      b += l;
201
    }
202
203
204
205
206

  free(buf);
  goto wait;
}

frtabu's avatar
frtabu committed
207
208

static void *forward_remote_messages(void *_f) {
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#define PUT(x) do { \
    if (bufsize == bufmaxsize) { \
      bufmaxsize += 4096; \
      buf = realloc(buf, bufmaxsize); \
      if (buf == NULL) abort(); \
    } \
    buf[bufsize] = x; \
    bufsize++; \
  } while (0)
#define PUT_BUF(x, l) do { \
    char *zz = (char *)(x); \
    int len = l; \
    while (len) { PUT(*zz); zz++; len--; } \
  } while (0)
223
  forward_data *f = _f;
224
225
  int from;
  int to;
226
227
  int l, len;
  char *b;
228
229
230
231
  char *buf = NULL;
  int bufsize = 0;
  int bufmaxsize = 0;
  char t;
232
233
again:

234
  while (1) {
235
236
    from = f->socket_remote;
    to = f->socket_local;
237
238
    bufsize = 0;
    /* let's read and process messages */
frtabu's avatar
frtabu committed
239
240
241
242
    len = read(from, &t, 1);

    if (len <= 0) goto dead;

243
244
245
    PUT(t);

    switch (t) {
frtabu's avatar
frtabu committed
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
      case 0:
      case 1:

        /* message 0 and 1: get a length and then 'length' numbers */
        if (read(from, &len, sizeof(int)) != sizeof(int)) goto dead;

        PUT_BUF(&len, 4);

        while (len) {
          if (read(from, &l, sizeof(int)) != sizeof(int)) goto dead;

          PUT_BUF(&l, 4);
          len--;
        }

        break;

      case 2:
        break;
265

frtabu's avatar
frtabu committed
266
267
268
269
      default:
        printf("%s:%d:%s: unhandled message type %d\n",
               __FILE__, __LINE__, __FUNCTION__, t);
        abort();
270
271
272
    }

    b = buf;
frtabu's avatar
frtabu committed
273

274
275
    while (bufsize) {
      l = write(to, b, bufsize);
frtabu's avatar
frtabu committed
276

277
      if (l <= 0) abort();
frtabu's avatar
frtabu committed
278

279
      bufsize -= l;
280
281
282
      b += l;
    }
  }
283

284
dead:
285
  /* socket died, let's stop all traces and wait for another tracer */
286
  /* TODO: be careful with those write, they might write less than wanted */
287
  buf[0] = 1;
frtabu's avatar
frtabu committed
288

289
  if (write(to, buf, 1) != 1) abort();
frtabu's avatar
frtabu committed
290

291
  len = T_NUMBER_OF_IDS;
frtabu's avatar
frtabu committed
292

293
  if (write(to, &len, sizeof(int)) != sizeof(int)) abort();
frtabu's avatar
frtabu committed
294

295
  l = 0;
frtabu's avatar
frtabu committed
296

297
298
  while (len) {
    if (write(to, &l, sizeof(int)) != sizeof(int)) abort();
frtabu's avatar
frtabu committed
299

300
301
302
303
    len--;
  };

  close(f->socket_remote);
frtabu's avatar
frtabu committed
304

305
  f->socket_remote = get_connection("0.0.0.0", f->remote_port);
frtabu's avatar
frtabu committed
306

307
  send_T_messages_txt(f);
frtabu's avatar
frtabu committed
308

309
310
  goto again;

311
312
313
  return NULL;
}

frtabu's avatar
frtabu committed
314
static void *forwarder(int port, int s) {
315
  forward_data *f;
frtabu's avatar
frtabu committed
316
  f = malloc(sizeof(*f));
317

frtabu's avatar
frtabu committed
318
  if (f == NULL) abort();
319
320
321
322
323
324
325

  pthread_mutex_init(&f->lock, NULL);
  pthread_cond_init(&f->cond, NULL);
  f->socket_local = s;
  f->head = f->tail = NULL;
  f->memusage = 0;
  f->last_warning_memusage = 0;
Cedric Roux's avatar
Cedric Roux committed
326
  printf("T tracer: waiting for remote tracer on port %d\n", port);
327
  f->remote_port = port;
328
  f->socket_remote = get_connection("0.0.0.0", port);
329
  send_T_messages_txt(f);
330
331
332
333
334
  new_thread(data_sender, f);
  new_thread(forward_remote_messages, f);
  return f;
}

frtabu's avatar
frtabu committed
335
static void forward(void *_forwarder, char *buf, int size) {
336
337
338
  forward_data *f = _forwarder;
  int32_t ssize = size;
  databuf *new;
frtabu's avatar
frtabu committed
339
  new = malloc(sizeof(*new));
340

frtabu's avatar
frtabu committed
341
  if (new == NULL) abort();
342
343
344

  if (pthread_mutex_lock(&f->lock)) abort();

frtabu's avatar
frtabu committed
345
346
347
348
  new->d = malloc(size + 4);

  if (new->d == NULL) abort();

349
350
351
352
353
  /* put the size of the message at the head */
  memcpy(new->d, &ssize, 4);
  memcpy(new->d+4, buf, size);
  new->l = size+4;
  new->next = NULL;
frtabu's avatar
frtabu committed
354

355
  if (f->head == NULL) f->head = new;
frtabu's avatar
frtabu committed
356

357
358
  if (f->tail != NULL) f->tail->next = new;

frtabu's avatar
frtabu committed
359
  f->tail = new;
Cedric Roux's avatar
Cedric Roux committed
360
#if BASIC_SIMULATOR
frtabu's avatar
frtabu committed
361

Cedric Roux's avatar
Cedric Roux committed
362
363
364
365
366
367
368
  /* When runnng the basic simulator, the tracer may be too slow.
   * Let's not take too much memory in the tracee and
   * wait if there is too much data to send. 200MB is
   * arbitrary.
   */
  while (f->memusage > 200 * 1024 * 1024) {
    if (pthread_cond_signal(&f->cond)) abort();
frtabu's avatar
frtabu committed
369

Cedric Roux's avatar
Cedric Roux committed
370
    if (pthread_mutex_unlock(&f->lock)) abort();
frtabu's avatar
frtabu committed
371

Cedric Roux's avatar
Cedric Roux committed
372
    usleep(1000);
frtabu's avatar
frtabu committed
373

Cedric Roux's avatar
Cedric Roux committed
374
375
376
    if (pthread_mutex_lock(&f->lock)) abort();
  }

frtabu's avatar
frtabu committed
377
#endif /* BASIC_SIMULATOR */
378
  f->memusage += size+4;
frtabu's avatar
frtabu committed
379

380
381
382
383
  /* warn every 100MB */
  if (f->memusage > f->last_warning_memusage &&
      f->memusage - f->last_warning_memusage > 100000000) {
    f->last_warning_memusage += 100000000;
Cedric Roux's avatar
Cedric Roux committed
384
    printf("T tracer: WARNING: memory usage is over %"PRIu64"MB\n",
385
           f->last_warning_memusage / 1000000);
frtabu's avatar
frtabu committed
386
387
  } else if (f->memusage < f->last_warning_memusage &&
             f->last_warning_memusage - f->memusage > 100000000) {
388
389
390
391
    f->last_warning_memusage = (f->memusage/100000000) * 100000000;
  }

  if (pthread_cond_signal(&f->cond)) abort();
frtabu's avatar
frtabu committed
392

393
394
395
396
397
398
399
  if (pthread_mutex_unlock(&f->lock)) abort();
}

/****************************************************************************/
/*                      local functions                                     */
/****************************************************************************/

frtabu's avatar
frtabu committed
400
static void wait_message(void) {
Cedric Roux's avatar
Cedric Roux committed
401
  while ((T_local_cache[T_busylist_head].busy & 0x02) == 0) usleep(1000);
402
403
}

Cedric Roux's avatar
Cedric Roux committed
404
void T_local_tracer_main(int remote_port, int wait_for_tracer,
frtabu's avatar
frtabu committed
405
                         int local_socket, void *shm_array) {
406
  int s;
Cedric Roux's avatar
Cedric Roux committed
407
408
  int port = remote_port;
  int dont_wait = wait_for_tracer ? 0 : 1;
409
  void *f;
Cedric Roux's avatar
Cedric Roux committed
410

411
  /* write on a socket fails if the other end is closed and we get SIGPIPE */
Cedric Roux's avatar
Cedric Roux committed
412
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
frtabu's avatar
frtabu committed
413
414
    printf("local tracer received SIGPIPE\n");
    abort();
Cedric Roux's avatar
Cedric Roux committed
415
416
417
  }

  T_local_cache = shm_array;
Cedric Roux's avatar
Cedric Roux committed
418
  s = local_socket;
Cedric Roux's avatar
Cedric Roux committed
419

420
421
  if (dont_wait) {
    char t = 2;
frtabu's avatar
frtabu committed
422

423
424
    if (write(s, &t, 1) != 1) abort();
  }
Cedric Roux's avatar
Cedric Roux committed
425

426
  f = forwarder(port, s);
Cedric Roux's avatar
Cedric Roux committed
427

428
429
430
431
  /* read messages */
  while (1) {
    wait_message();
    __sync_synchronize();
432
433
434
    forward(f, T_local_cache[T_busylist_head].buffer,
            T_local_cache[T_busylist_head].length);
    T_local_cache[T_busylist_head].busy = 0;
435
436
437
438
    T_busylist_head++;
    T_busylist_head &= T_CACHE_SIZE - 1;
  }
}