Commit f5313266 authored by Cedric Roux's avatar Cedric Roux

integration of local tracer into tracee - step 3

get rid of forward.c, make everything static inside local.c
parent 3b226410
......@@ -6,7 +6,7 @@ CFLAGS=-Wall -g -pthread -DT_TRACER
LIBS += -lrt
PROG=tracer_local
OBJS=local.o forward.o
OBJS=local.o
$(PROG): $(OBJS)
$(CC) $(CFLAGS) -o $(PROG) $(OBJS) $(LIBS)
......
#include "forward.h"
#include <stdlib.h>
#include <stdio.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
/* from local.c */
int get_connection(char *addr, int port);
void new_thread(void *(*f)(void *), void *data);
typedef struct databuf {
char *d;
int l;
struct databuf *next;
} databuf;
typedef struct {
int s;
int sc;
pthread_mutex_t lock;
pthread_mutex_t datalock;
pthread_cond_t datacond;
databuf * volatile head, *tail;
uint64_t memusage;
uint64_t last_warning_memusage;
} forward_data;
static void *data_sender(void *_f)
{
forward_data *f = _f;
databuf *cur;
char *buf, *b;
int size;
wait:
if (pthread_mutex_lock(&f->datalock)) abort();
while (f->head == NULL)
if (pthread_cond_wait(&f->datacond, &f->datalock)) abort();
cur = f->head;
buf = cur->d;
size = cur->l;
f->head = cur->next;
f->memusage -= size;
if (f->head == NULL) f->tail = NULL;
if (pthread_mutex_unlock(&f->datalock)) abort();
free(cur);
goto process;
process:
if (pthread_mutex_lock(&f->lock)) abort();
b = buf;
while (size) {
int l = write(f->s, b, size);
if (l <= 0) { printf("forward error\n"); exit(1); }
size -= l;
b += l;
}
if (pthread_mutex_unlock(&f->lock)) abort();
free(buf);
goto wait;
}
static void do_forward(forward_data *f, int from, int to, int lock)
{
int l, len;
char *b;
char buf[1024];
while (1) {
len = read(from, buf, 1024);
if (len <= 0) break;
b = buf;
if (lock) if (pthread_mutex_lock(&f->lock)) abort();
while (len) {
l = write(to, b, len);
if (l <= 0) break;
len -= l;
b += l;
}
if (lock) if (pthread_mutex_unlock(&f->lock)) abort();
}
}
static void *forward_s_to_sc(void *_f)
{
forward_data *f = _f;
do_forward(f, f->s, f->sc, 0);
return NULL;
}
void forward_start_client(void *_f, int s)
{
forward_data *f = _f;
f->sc = s;
new_thread(forward_s_to_sc, f);
}
void *forwarder(int port)
{
forward_data *f;
f = malloc(sizeof(*f)); if (f == NULL) abort();
pthread_mutex_init(&f->lock, NULL);
pthread_mutex_init(&f->datalock, NULL);
pthread_cond_init(&f->datacond, NULL);
f->sc = -1;
f->head = f->tail = NULL;
f->memusage = 0;
f->last_warning_memusage = 0;
printf("waiting for remote tracer on port %d\n", port);
f->s = get_connection("127.0.0.1", port);
printf("connected\n");
new_thread(data_sender, f);
return f;
}
void forward(void *_forwarder, char *buf, int size)
{
forward_data *f = _forwarder;
int32_t ssize = size;
databuf *new;
new = malloc(sizeof(*new)); if (new == NULL) abort();
if (pthread_mutex_lock(&f->datalock)) abort();
new->d = malloc(size + 4); if (new->d == NULL) abort();
/* put the size of the message at the head */
memcpy(new->d, &ssize, 4);
memcpy(new->d+4, buf, size);
new->l = size+4;
new->next = NULL;
if (f->head == NULL) f->head = new;
if (f->tail != NULL) f->tail->next = new;
f->tail = new;
f->memusage += size+4;
/* warn every 100MB */
if (f->memusage > f->last_warning_memusage &&
f->memusage - f->last_warning_memusage > 100000000) {
f->last_warning_memusage += 100000000;
printf("WARNING: memory usage is over %"PRIu64"MB\n",
f->last_warning_memusage / 1000000);
} else
if (f->memusage < f->last_warning_memusage &&
f->last_warning_memusage - f->memusage > 100000000) {
f->last_warning_memusage = (f->memusage/100000000) * 100000000;
}
if (pthread_cond_signal(&f->datacond)) abort();
if (pthread_mutex_unlock(&f->datalock)) abort();
}
#ifndef _FORWARD_H_
#define _FORWARD_H_
void *forwarder(int port);
void forward(void *forwarder, char *buf, int size);
void forward_start_client(void *forwarder, int socket);
#endif /* _FORWARD_H_ */
......@@ -8,18 +8,53 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <inttypes.h>
#define DEFAULT_PORT 2021
#include "forward.h"
#include "../T_defs.h"
T_cache_t *T_cache;
int T_busylist_head;
int T_pos;
static T_cache_t *T_cache;
static int T_busylist_head;
typedef struct databuf {
char *d;
int l;
struct databuf *next;
} databuf;
typedef struct {
int socket_local;
int socket_remote;
pthread_mutex_t lock;
pthread_cond_t cond;
databuf * volatile head, *tail;
uint64_t memusage;
uint64_t last_warning_memusage;
} forward_data;
int get_connection(char *addr, int port)
/****************************************************************************/
/* utility functions */
/****************************************************************************/
static void new_thread(void *(*f)(void *), void *data)
{
pthread_t t;
pthread_attr_t att;
if (pthread_attr_init(&att))
{ fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
{ fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
if (pthread_attr_setstacksize(&att, 10000000))
{ fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
if (pthread_create(&t, &att, f, data))
{ fprintf(stderr, "pthread_create err\n"); exit(1); }
if (pthread_attr_destroy(&att))
{ fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}
static int get_connection(char *addr, int port)
{
struct sockaddr_in a;
socklen_t alen;
......@@ -49,12 +84,142 @@ int get_connection(char *addr, int port)
return t;
}
void wait_message(void)
/****************************************************************************/
/* forward functions */
/****************************************************************************/
static void *data_sender(void *_f)
{
forward_data *f = _f;
databuf *cur;
char *buf, *b;
int size;
wait:
if (pthread_mutex_lock(&f->lock)) abort();
while (f->head == NULL)
if (pthread_cond_wait(&f->cond, &f->lock)) abort();
cur = f->head;
buf = cur->d;
size = cur->l;
f->head = cur->next;
f->memusage -= size;
if (f->head == NULL) f->tail = NULL;
if (pthread_mutex_unlock(&f->lock)) abort();
free(cur);
goto process;
process:
b = buf;
while (size) {
int l = write(f->socket_remote, b, size);
if (l <= 0) { printf("forward error\n"); exit(1); }
size -= l;
b += l;
}
free(buf);
goto wait;
}
static void *forward_remote_messages(void *_f)
{
forward_data *f = _f;
int from = f->socket_remote;
int to = f->socket_local;
int l, len;
char *b;
char buf[1024];
while (1) {
len = read(from, buf, 1024);
if (len <= 0) break;
b = buf;
while (len) {
l = write(to, b, len);
if (l <= 0) break;
len -= l;
b += l;
}
}
return NULL;
}
static void *forwarder(int port, int s)
{
forward_data *f;
f = malloc(sizeof(*f)); if (f == NULL) abort();
pthread_mutex_init(&f->lock, NULL);
pthread_cond_init(&f->cond, NULL);
f->socket_local = s;
f->head = f->tail = NULL;
f->memusage = 0;
f->last_warning_memusage = 0;
printf("waiting for remote tracer on port %d\n", port);
f->socket_remote = get_connection("127.0.0.1", port);
printf("connected\n");
new_thread(data_sender, f);
new_thread(forward_remote_messages, f);
return f;
}
static void forward(void *_forwarder, char *buf, int size)
{
forward_data *f = _forwarder;
int32_t ssize = size;
databuf *new;
new = malloc(sizeof(*new)); if (new == NULL) abort();
if (pthread_mutex_lock(&f->lock)) abort();
new->d = malloc(size + 4); if (new->d == NULL) abort();
/* put the size of the message at the head */
memcpy(new->d, &ssize, 4);
memcpy(new->d+4, buf, size);
new->l = size+4;
new->next = NULL;
if (f->head == NULL) f->head = new;
if (f->tail != NULL) f->tail->next = new;
f->tail = new;
f->memusage += size+4;
/* warn every 100MB */
if (f->memusage > f->last_warning_memusage &&
f->memusage - f->last_warning_memusage > 100000000) {
f->last_warning_memusage += 100000000;
printf("WARNING: memory usage is over %"PRIu64"MB\n",
f->last_warning_memusage / 1000000);
} else
if (f->memusage < f->last_warning_memusage &&
f->last_warning_memusage - f->memusage > 100000000) {
f->last_warning_memusage = (f->memusage/100000000) * 100000000;
}
if (pthread_cond_signal(&f->cond)) abort();
if (pthread_mutex_unlock(&f->lock)) abort();
}
/****************************************************************************/
/* local functions */
/****************************************************************************/
static void wait_message(void)
{
while (T_cache[T_busylist_head].busy == 0) usleep(1000);
}
void init_shm(void)
static void init_shm(void)
{
int i;
int s = shm_open(T_SHM_FILENAME, O_RDWR | O_CREAT /*| O_SYNC*/, 0666);
......@@ -74,24 +239,7 @@ void init_shm(void)
for (i = 0; i < T_CACHE_SIZE; i++) T_cache[i].busy = 0;
}
void new_thread(void *(*f)(void *), void *data)
{
pthread_t t;
pthread_attr_t att;
if (pthread_attr_init(&att))
{ fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
{ fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
if (pthread_attr_setstacksize(&att, 10000000))
{ fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
if (pthread_create(&t, &att, f, data))
{ fprintf(stderr, "pthread_create err\n"); exit(1); }
if (pthread_attr_destroy(&att))
{ fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}
void usage(void)
static void usage(void)
{
printf(
"tracer - local side\n"
......@@ -129,8 +277,7 @@ int main(int n, char **v)
if (write(s, &t, 1) != 1) abort();
}
f = forwarder(port);
forward_start_client(f, s);
f = forwarder(port, s);
/* read messages */
while (1) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment