Commit 09e8a4e4 authored by Sebastien Decugis's avatar Sebastien Decugis

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.

parent 51ea5276
Pipeline #1673 skipped
......@@ -183,6 +183,7 @@ struct fd_peer { /* The "real" definition of the peer structure */
/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
struct sr_list p_sr;
struct fifo *p_tofailover;
/* Pending received requests not yet answered (count only) */
long p_reqin_count; /* We use p_state_mtx to protect this value */
......
......@@ -74,8 +74,13 @@ static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struc
/* Log the message */
fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only));
pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */);
/* Send the message */
CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), );
pthread_cleanup_pop(0);
out:
;
pthread_cleanup_pop(1);
......@@ -92,20 +97,12 @@ out:
return 0;
}
static void cleanup_requeue(void * arg)
{
struct msg *msg = arg;
CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
{
fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));
CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
} );
}
/* The code of the "out" thread */
static void * out_thr(void * arg)
{
struct fd_peer * peer = arg;
int stop = 0;
struct msg * msg;
ASSERT( CHECK_PEER(peer) );
/* Set the thread name */
......@@ -116,16 +113,12 @@ static void * out_thr(void * arg)
}
/* Loop until cancelation */
while (1) {
struct msg * msg;
while (!stop) {
int ret;
/* Retrieve next message to send */
CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
/* Now if we are cancelled, we requeue this message */
pthread_cleanup_push(cleanup_requeue, msg);
/* Send the message, log any error */
CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer),
{
......@@ -135,12 +128,30 @@ static void * out_thr(void * arg)
fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
fd_msg_free(msg);
}
stop = 1;
} );
/* Loop */
pthread_cleanup_pop(0);
}
/* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */
CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
/* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */
while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) {
if (fd_msg_is_routable(msg)) {
CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg),
{
/* fallback: destroy the message */
fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg));
CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
} );
} else {
/* Just free it */
/* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
}
}
error:
/* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
......
......@@ -37,9 +37,9 @@
/* Structure to store a sent request */
struct sentreq {
struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */
struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */
struct msg *req; /* A request that was sent and not yet answered. */
uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */
uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */
struct fd_list expire; /* the list of expiring requests */
struct timespec added_on; /* the time the request was added */
};
......@@ -65,10 +65,7 @@ static void srl_dump(const char * text, struct fd_list * srlist)
struct fd_list * li;
struct timespec now;
if (!TRACE_BOOL(ANNOYING))
return;
fd_log_debug("%sSentReq list @%p:", text, srlist);
LOG_D("%sSentReq list @%p:", text, srlist);
CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
......@@ -76,7 +73,7 @@ static void srl_dump(const char * text, struct fd_list * srlist)
struct sentreq * sr = (struct sentreq *)li;
uint32_t * nexthbh = li->o;
fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh,
LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh,
(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)),
(long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000)));
}
......@@ -224,8 +221,9 @@ int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, u
CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
next = find_or_next(&srlist->srs, *hbhloc, &match);
if (match) {
TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc);
free(sr);
srl_dump("Current list of SR: ", &srlist->srs);
CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
return EINVAL;
}
......@@ -234,7 +232,6 @@ int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, u
*req = NULL;
fd_list_insert_before(next, &sr->chain);
srlist->cnt++;
srl_dump("Saved new request, ", &srlist->srs);
/* In case of request with a timeout, also store in the timeout list */
ts = fd_msg_anscb_gettimeout( sr->req );
......@@ -279,10 +276,10 @@ int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req)
/* Search the request in the list */
CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
srl_dump("Fetching a request, ", &srlist->srs);
sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
if (!match) {
TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
srl_dump("Current list of SR: ", &srlist->srs);
*req = NULL;
} else {
/* Restore hop-by-hop id */
......
......@@ -77,6 +77,7 @@ int fd_peer_alloc(struct fd_peer ** ptr)
fd_list_init(&p->p_actives, p);
fd_list_init(&p->p_expiry, p);
CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) );
p->p_hbh = lrand48();
fd_list_init(&p->p_sr.srs, p);
......@@ -232,7 +233,7 @@ int fd_peer_getbyid( DiamId_t diamid, size_t diamidlen, int igncase, struct peer
free(__li); \
}
/* Empty the lists of p_tosend and p_sentreq messages */
/* Empty the lists of p_tosend, p_failover, and p_sentreq messages */
void fd_peer_failover_msg(struct fd_peer * peer)
{
struct msg *m;
......@@ -257,6 +258,17 @@ void fd_peer_failover_msg(struct fd_peer * peer)
}
}
/* Requeue all messages in the "failover" queue */
while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) {
fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m));
CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m),
{
/* fallback: destroy the message */
fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m));
CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */)
} );
}
/* Requeue all routable sent requests */
fd_p_sr_failover(&peer->p_sr);
......@@ -334,6 +346,7 @@ int fd_peer_free(struct fd_peer ** ptr)
fd_list_unlink(&p->p_actives);
CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ );
CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ );
CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */);
CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment