Commit f5b381ea authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Added a limit on fifo queues to avoid memory exaustion when messages are...

Added a limit on fifo queues to avoid memory exaustion when messages are received faster than handled
parent f71cbfe9
......@@ -326,7 +326,7 @@ int rgw_work_start(void)
memset(workers, 0, sizeof(workers));
CHECK_FCT( fd_fifo_new ( &work_stack ) );
CHECK_FCT( fd_fifo_new ( &work_stack, 30 ) );
/* Create the worker thread(s) */
for (i = 0; i < NB_WORKERS; i++) {
......
......@@ -41,9 +41,9 @@ struct fifo {
};
%extend fifo {
fifo() {
fifo(int max = 0) {
struct fifo * q = NULL;
int ret = fd_fifo_new(&q);
int ret = fd_fifo_new(&q, max);
if (ret != 0) {
DI_ERROR(ret, NULL, NULL);
return NULL;
......@@ -146,7 +146,7 @@ struct fifo {
ret = fd_fifo_tryget($self, &obj);
if (ret == EWOULDBLOCK) {
Py_XINCREF(Py_None);
Py_INCREF(Py_None);
return Py_None;
}
if (ret != 0) {
......@@ -181,7 +181,7 @@ struct fifo {
ret = fd_fifo_timedget($self, &obj, &ts);
if (ret == ETIMEDOUT) {
Py_XINCREF(Py_None);
Py_INCREF(Py_None);
return Py_None;
}
if (ret != 0) {
......
......@@ -2758,6 +2758,8 @@ struct fifo;
*
* PARAMETERS:
* queue : Upon success, a pointer to the new queue is saved here.
* max : max number of items in the queue. Above this number, adding a new item becomes a
* blocking operation. Use 0 to disable this maximum.
*
* DESCRIPTION:
* Create a new empty queue.
......@@ -2767,7 +2769,7 @@ struct fifo;
* EINVAL : The parameter is invalid.
* ENOMEM : Not enough memory to complete the creation.
*/
int fd_fifo_new ( struct fifo ** queue );
int fd_fifo_new ( struct fifo ** queue, int max );
/*
* FUNCTION: fd_fifo_del
......
......@@ -89,7 +89,7 @@ static struct cnxctx * fd_cnx_init(int full)
memset(conn, 0, sizeof(struct cnxctx));
if (full) {
CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming ), return NULL );
CHECK_FCT_DO( fd_fifo_new ( &conn->cc_incoming, 5 ), return NULL );
}
return conn;
......
......@@ -68,7 +68,7 @@ int fd_conf_init()
fd_g_config->cnf_orstateid = (uint32_t) time(NULL);
CHECK_FCT( fd_dict_init(&fd_g_config->cnf_dict) );
CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev) );
CHECK_FCT( fd_fifo_new(&fd_g_config->cnf_main_ev, 0) );
/* TLS parameters */
CHECK_GNUTLS_DO( gnutls_certificate_allocate_credentials (&fd_g_config->cnf_sec_data.credentials), return ENOMEM );
......
......@@ -855,7 +855,7 @@ int fd_psm_begin(struct fd_peer * peer )
CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW );
/* Create the FIFO for events */
CHECK_FCT( fd_fifo_new(&peer->p_events) );
CHECK_FCT( fd_fifo_new(&peer->p_events, 0) );
/* Create the PSM controler thread */
CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) );
......
......@@ -76,7 +76,7 @@ int fd_peer_alloc(struct fd_peer ** ptr)
fd_list_init(&p->p_actives, p);
fd_list_init(&p->p_expiry, p);
CHECK_FCT( fd_fifo_new(&p->p_tosend) );
CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
p->p_hbh = lrand48();
fd_list_init(&p->p_sr.srs, p);
......
......@@ -44,9 +44,9 @@ struct fifo * fd_g_local = NULL;
int fd_queues_init(void)
{
TRACE_ENTRY();
CHECK_FCT( fd_fifo_new ( &fd_g_incoming ) );
CHECK_FCT( fd_fifo_new ( &fd_g_outgoing ) );
CHECK_FCT( fd_fifo_new ( &fd_g_local ) );
CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) );
CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) );
CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) );
return 0;
}
......
......@@ -496,7 +496,7 @@ int fd_sctps_init(struct cnxctx * conn)
for (i = 0; i < conn->cc_sctp_para.pairs; i++) {
conn->cc_sctps_data.array[i].parent = conn;
conn->cc_sctps_data.array[i].strid = i;
CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv) );
CHECK_FCT( fd_fifo_new(&conn->cc_sctps_data.array[i].raw_recv, 10) );
}
/* Set push/pull functions in the master session, using fifo in array[0] */
......
......@@ -52,12 +52,16 @@ struct fifo {
int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
pthread_mutex_t mtx; /* Mutex protecting this queue */
pthread_cond_t cond; /* condition variable of the list */
pthread_cond_t cond_pull; /* condition variable for pulling threads */
pthread_cond_t cond_push; /* condition variable for pushing threads */
struct fd_list list; /* sentinel for the list of elements */
int count; /* number of objects in the list */
int thrs; /* number of threads waiting for a new element (when count is 0) */
int max; /* maximum number of items to accept if not 0 */
int thrs_push; /* number of threads waitnig to push an item */
uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */
uint16_t low; /* Low level threshhold */
void *data; /* Opaque pointer for threshold callbacks */
......@@ -74,8 +78,8 @@ struct fifo {
#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
/* Create a new queue */
int fd_fifo_new ( struct fifo ** queue )
/* Create a new queue, with max number of items -- use 0 for no max */
int fd_fifo_new ( struct fifo ** queue, int max )
{
struct fifo * new;
......@@ -91,7 +95,9 @@ int fd_fifo_new ( struct fifo ** queue )
new->eyec = FIFO_EYEC;
CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
CHECK_POSIX( pthread_cond_init(&new->cond, NULL) );
CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
new->max = max;
fd_list_init(&new->list, NULL);
......@@ -118,6 +124,7 @@ void fd_fifo_dump(int level, char * name, struct fifo * queue, void (*dump_item)
CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ );
fd_log_debug(" %d elements in queue / %d threads waiting\n", queue->count, queue->thrs);
fd_log_debug(" %d elements max / %d threads waiting to push\n", queue->max, queue->thrs_push);
fd_log_debug(" thresholds: %d / %d (h:%d), cb: %p,%p (%p), highest: %d\n",
queue->high, queue->low, queue->highest,
queue->h_cb, queue->l_cb, queue->data,
......@@ -161,11 +168,8 @@ int fd_fifo_del ( struct fifo ** queue )
/* Have all waiting threads return an error */
while (q->thrs) {
CHECK_POSIX( pthread_mutex_unlock( &q->mtx ));
CHECK_POSIX( pthread_cond_signal(&q->cond) );
sched_yield();
if (loops >= 10)
/* sleep for a few milliseconds */
usleep(50000);
CHECK_POSIX( pthread_cond_signal(&q->cond_pull) );
usleep(1000);
CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
ASSERT( ++loops < 20 ); /* detect infinite loops */
......@@ -177,7 +181,9 @@ int fd_fifo_del ( struct fifo ** queue )
/* And destroy it */
CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) );
CHECK_POSIX_DO( pthread_cond_destroy( &q->cond ), );
CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), );
CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), );
CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), );
......@@ -206,26 +212,29 @@ int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_upda
/* Lock the queues */
CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
CHECK_PARAMS_DO( (! old->thrs_push), {
pthread_mutex_unlock( &old->mtx );
return EINVAL;
} );
CHECK_POSIX( pthread_mutex_lock( &new->mtx ) );
/* Any waiting thread on the old queue returns an error */
old->eyec = 0xdead;
while (old->thrs) {
CHECK_POSIX( pthread_mutex_unlock( &old->mtx ));
CHECK_POSIX( pthread_cond_signal(&old->cond) );
sched_yield();
if (loops >= 10)
/* sleep for a few milliseconds */
usleep(50000);
CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) );
usleep(1000);
CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
ASSERT( ++loops < 20 ); /* detect infinite loops */
ASSERT( loops < 20 ); /* detect infinite loops */
}
/* Move all data from old to new */
fd_list_move_end( &new->list, &old->list );
if (old->count && (!new->count)) {
CHECK_POSIX( pthread_cond_signal(&new->cond) );
CHECK_POSIX( pthread_cond_signal(&new->cond_pull) );
}
new->count += old->count;
......@@ -295,6 +304,24 @@ int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h
return 0;
}
/* This handler is called when a thread is blocked on a queue, and cancelled */
static void fifo_cleanup_push(void * queue)
{
struct fifo * q = (struct fifo *)queue;
TRACE_ENTRY( "%p", queue );
/* The thread has been cancelled, therefore it does not wait on the queue anymore */
q->thrs_push--;
/* Now unlock the queue, and we're done */
CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
/* End of cleanup handler */
return;
}
/* Post a new item in the queue */
int fd_fifo_post_int ( struct fifo * queue, void ** item )
{
......@@ -306,15 +333,32 @@ int fd_fifo_post_int ( struct fifo * queue, void ** item )
/* Check the parameters */
CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
/* lock the queue */
CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
if (queue->max) {
while (queue->count >= queue->max) {
int ret = 0;
/* We have to wait for an item to be pulled */
queue->thrs_push++ ;
pthread_cleanup_push( fifo_cleanup_push, queue);
ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
pthread_cleanup_pop(0);
queue->thrs_push-- ;
ASSERT( ret == 0 );
}
}
/* Create a new list item */
CHECK_MALLOC( new = malloc (sizeof (struct fd_list)) );
CHECK_MALLOC_DO( new = malloc (sizeof (struct fd_list)) , {
pthread_mutex_unlock( &queue->mtx );
} );
fd_list_init(new, *item);
*item = NULL;
/* lock the queue */
CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
/* Add the new item at the end */
fd_list_insert_before( &queue->list, new);
queue->count++;
......@@ -327,7 +371,11 @@ int fd_fifo_post_int ( struct fifo * queue, void ** item )
/* Signal if threads are asleep */
if (queue->thrs > 0) {
CHECK_POSIX( pthread_cond_signal(&queue->cond) );
CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) );
}
if (queue->thrs_push > 0) {
/* cascade */
CHECK_POSIX( pthread_cond_signal(&queue->cond_push) );
}
/* Unlock */
......@@ -354,6 +402,10 @@ static void * mq_pop(struct fifo * queue)
ret = li->o;
free(li);
if (queue->thrs_push) {
CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
}
return ret;
}
......@@ -387,10 +439,21 @@ int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
/* Check queue status */
if (queue->count > 0) {
got_item:
/* There are elements in the queue, so pick the first one */
*item = mq_pop(queue);
call_cb = test_l_cb(queue);
} else {
if (queue->thrs_push > 0) {
/* A thread is trying to push something, let's give it a chance */
CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) );
usleep(1000);
CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
if (queue->count > 0)
goto got_item;
}
wouldblock = 1;
*item = NULL;
}
......@@ -456,9 +519,9 @@ awaken:
queue->thrs++ ;
pthread_cleanup_push( fifo_cleanup, queue);
if (istimed) {
ret = pthread_cond_timedwait( &queue->cond, &queue->mtx, abstime );
ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
} else {
ret = pthread_cond_wait( &queue->cond, &queue->mtx );
ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
}
pthread_cleanup_pop(0);
queue->thrs-- ;
......
......@@ -1498,7 +1498,7 @@ int main(int argc, char *argv[])
/* fd_cnx_recv_setaltfifo */
CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
CHECK( 0, fd_fifo_new(&myfifo) );
CHECK( 0, fd_fifo_new(&myfifo, 0) );
CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
do {
......@@ -1590,7 +1590,7 @@ int main(int argc, char *argv[])
/* fd_cnx_recv_setaltfifo */
CHECK( 0, fd_cnx_send(client_side, cer_buf, cer_sz, 0));
CHECK( 0, fd_fifo_new(&myfifo) );
CHECK( 0, fd_fifo_new(&myfifo, 0) );
CHECK( 0, fd_cnx_recv_setaltfifo(server_side, myfifo) );
CHECK( 0, clock_gettime(CLOCK_REALTIME, &now) );
do {
......
......@@ -110,6 +110,25 @@ static void * test_fct(void * data)
return NULL;
}
/* The test function, to be threaded */
static int iter = 0;
static void * test_fct2(void * data)
{
int i;
int * item;
struct test_data * td = (struct test_data *) data;
for (i=0; i< td->nbr; i++) {
item = malloc(sizeof(int));
CHECK( 1, item ? 1 : 0 );
*item = i;
CHECK( 0, fd_fifo_post(td->queue, &item) );
iter++;
}
return NULL;
}
/* Main test routine */
int main(int argc, char *argv[])
......@@ -144,7 +163,7 @@ int main(int argc, char *argv[])
struct msg * msg = NULL;
/* Create the queue */
CHECK( 0, fd_fifo_new(&queue) );
CHECK( 0, fd_fifo_new(&queue, 0) );
/* Check the count is 0 */
CHECK( 0, fd_fifo_length(queue, &count) );
......@@ -232,7 +251,7 @@ int main(int argc, char *argv[])
}
/* Create the queue */
CHECK( 0, fd_fifo_new(&queue) );
CHECK( 0, fd_fifo_new(&queue, 0) );
/* Create the barrier */
CHECK( 0, pthread_barrier_init(&bar, NULL, nbr_threads * 2 + 1) );
......@@ -302,7 +321,7 @@ int main(int argc, char *argv[])
pthread_t th;
/* Create the queue */
CHECK( 0, fd_fifo_new(&queue) );
CHECK( 0, fd_fifo_new(&queue, 0) );
/* Create the barrier */
CHECK( 0, pthread_barrier_init(&bar, NULL, 2) );
......@@ -369,7 +388,7 @@ int main(int argc, char *argv[])
struct msg * msg = NULL;
/* Create the queue */
CHECK( 0, fd_fifo_new(&queue) );
CHECK( 0, fd_fifo_new(&queue, 0) );
/* Prepare the test data */
memset(&thrh_td, 0, sizeof(thrh_td));
......@@ -441,6 +460,57 @@ int main(int argc, char *argv[])
CHECK( 0, fd_fifo_del(&queue) );
}
/* Test max queue limit */
{
struct fifo *queue = NULL;
struct test_data td;
pthread_t th;
int * item, i;
/* Create the queue */
CHECK( 0, fd_fifo_new(&queue, 10) );
/* Initialize the test data structures */
td.queue = queue;
td.nbr = 15;
CHECK( 0, pthread_create( &th, NULL, test_fct2, &td ) );
usleep(1000); /* 1 millisec */
CHECK( 10, iter );
CHECK( 0, fd_fifo_tryget(queue, &item) );
CHECK( 0, *item);
free(item);
usleep(1000); /* 1 millisec */
CHECK( 11, iter );
for (i=1; i<4; i++) {
CHECK( 0, fd_fifo_get(queue, &item) );
CHECK( i, *item);
free(item);
}
usleep(1000); /* 1 millisec */
CHECK( 14, iter );
/* fd_fifo_dump(0, "test", queue, NULL); */
for (; i < td.nbr; i++) {
CHECK( 0, fd_fifo_tryget(queue, &item) );
CHECK( i, *item);
free(item);
}
CHECK( 0, pthread_join( th, NULL ) );
CHECK( 15, iter );
}
/* Delete the messages */
CHECK( 0, fd_msg_free( msg1 ) );
CHECK( 0, fd_msg_free( msg2 ) );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment