Commit 181138fc authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Small hack which might spear some concurrency problems and is quite harmless

parent fd11dda9
......@@ -463,6 +463,7 @@ void fd_cnx_sethostname(struct cnxctx * conn, char * hn)
int fd_cnx_getTLS(struct cnxctx * conn)
{
CHECK_PARAMS_DO( conn, return 0 );
fd_cpu_flush_cache();
return conn->cc_status & CC_STATUS_TLS;
}
......@@ -544,6 +545,7 @@ void fd_cnx_markerror(struct cnxctx * conn)
TRACE_DEBUG(FULL, "Error flag set for socket %d (%s / %s)", conn->cc_socket, conn->cc_remid, conn->cc_id);
/* Mark the error */
fd_cpu_flush_cache();
conn->cc_status |= CC_STATUS_ERROR;
/* Report the error if not reported yet, and not closing */
......@@ -552,7 +554,7 @@ void fd_cnx_markerror(struct cnxctx * conn)
CHECK_FCT_DO( fd_event_send( Target_Queue(conn), FDEVP_CNX_ERROR, 0, NULL), goto fatal);
conn->cc_status |= CC_STATUS_SIGNALED;
}
fd_cpu_flush_cache();
return;
fatal:
/* An unrecoverable error occurred, stop the daemon */
......@@ -580,6 +582,7 @@ again:
ret = recv(conn->cc_socket, buffer, length, 0);
/* Handle special case of timeout */
if ((ret < 0) && (errno == EAGAIN)) {
fd_cpu_flush_cache();
if (! (conn->cc_status & CC_STATUS_CLOSING))
goto again; /* don't care, just ignore */
if (!timedout) {
......@@ -606,6 +609,7 @@ again:
ret = send(conn->cc_socket, buffer, length, 0);
/* Handle special case of timeout */
if ((ret < 0) && (errno == EAGAIN)) {
fd_cpu_flush_cache();
if (! (conn->cc_status & CC_STATUS_CLOSING))
goto again; /* don't care, just ignore */
if (!timedout) {
......@@ -724,6 +728,7 @@ static void * rcvthr_notls_sctp(void * arg)
ASSERT( Target_Queue(conn) );
do {
fd_cpu_flush_cache();
CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, NULL, &buf, &bufsz, &event, &conn->cc_status), goto fatal );
if (event == FDEVP_CNX_ERROR) {
fd_cnx_markerror(conn);
......@@ -795,6 +800,7 @@ again:
{
switch (ret) {
case GNUTLS_E_REHANDSHAKE:
fd_cpu_flush_cache();
if (!(conn->cc_status & CC_STATUS_CLOSING))
CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
{
......@@ -806,6 +812,7 @@ again:
case GNUTLS_E_AGAIN:
case GNUTLS_E_INTERRUPTED:
fd_cpu_flush_cache();
if (!(conn->cc_status & CC_STATUS_CLOSING))
goto again;
TRACE_DEBUG(FULL, "Connection is closing, so abord gnutls_record_recv now.");
......@@ -839,6 +846,7 @@ again:
{
switch (ret) {
case GNUTLS_E_REHANDSHAKE:
fd_cpu_flush_cache();
if (!(conn->cc_status & CC_STATUS_CLOSING))
CHECK_GNUTLS_DO( ret = gnutls_handshake(session),
{
......@@ -850,6 +858,7 @@ again:
case GNUTLS_E_AGAIN:
case GNUTLS_E_INTERRUPTED:
fd_cpu_flush_cache();
if (!(conn->cc_status & CC_STATUS_CLOSING))
goto again;
TRACE_DEBUG(INFO, "Connection is closing, so abord gnutls_record_send now.");
......@@ -1234,6 +1243,7 @@ int fd_cnx_handshake(struct cnxctx * conn, int mode, char * priority, void * alt
}
/* Mark the connection as protected from here, so that the gnutls credentials will be freed */
fd_cpu_flush_cache();
conn->cc_status |= CC_STATUS_TLS;
/* Handshake master session */
......@@ -1358,6 +1368,7 @@ static int send_simple(struct cnxctx * conn, unsigned char * buf, size_t len)
size_t sent = 0;
TRACE_ENTRY("%p %p %zd", conn, buf, len);
do {
fd_cpu_flush_cache();
if (conn->cc_status & CC_STATUS_TLS) {
CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_tls_para.session, buf + sent, len - sent), );
} else {
......@@ -1391,6 +1402,7 @@ int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t
if (flags & FD_CNX_BROADCAST) {
/* Send the buffer over all other streams */
uint16_t str;
fd_cpu_flush_cache();
if (conn->cc_status & CC_STATUS_TLS) {
for ( str=1; str < conn->cc_sctp_para.pairs; str++) {
ssize_t ret;
......@@ -1473,6 +1485,7 @@ void fd_cnx_destroy(struct cnxctx * conn)
CHECK_PARAMS_DO(conn, return);
fd_cpu_flush_cache();
conn->cc_status |= CC_STATUS_CLOSING;
/* Initiate shutdown of the TLS session(s): call gnutls_bye(WR), then read until error */
......
......@@ -612,6 +612,7 @@ int fd_p_ce_handle_newcnx(struct fd_peer * peer, struct cnxctx * initiator)
CHECK_FCT( fd_out_send(&cer, initiator, peer, FD_CNX_ORDERED) );
/* Are we doing an election ? */
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state == STATE_WAITCNXACK_ELEC) {
if (election_result(peer)) {
/* Close initiator connection */
......@@ -660,6 +661,7 @@ int fd_p_ce_msgrcv(struct msg ** msg, int req, struct fd_peer * peer)
}
/* If the state is not WAITCEA, just discard the message */
fd_cpu_flush_cache();
if (req || (peer->p_hdr.info.runtime.pir_state != STATE_WAITCEA)) {
if (*msg) {
fd_log_debug("Received CER/CEA message while in state '%s', discarded.\n", STATE_STR(peer->p_hdr.info.runtime.pir_state));
......@@ -921,6 +923,7 @@ cleanup:
/* We have received a CER on a new connection for this peer */
int fd_p_ce_handle_newCER(struct msg ** msg, struct fd_peer * peer, struct cnxctx ** cnx, int valid)
{
fd_cpu_flush_cache();
switch (peer->p_hdr.info.runtime.pir_state) {
case STATE_CLOSED:
peer->p_receiver = *cnx;
......
......@@ -113,6 +113,7 @@ int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer)
} else {
/* We received a DPA */
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state != STATE_CLOSING) {
TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state));
}
......
......@@ -122,6 +122,7 @@ int fd_p_dw_handle(struct msg ** msg, int req, struct fd_peer * peer)
}
/* If we are in REOPEN state, increment the counter */
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state == STATE_REOPEN) {
peer->p_flags.pf_reopen_cnt += 1;
......
......@@ -60,6 +60,7 @@ static void * gc_th_fct(void * arg)
for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
struct fd_peer * peer = (struct fd_peer *)li;
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE)
continue;
......
......@@ -142,6 +142,7 @@ int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, u
TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
fd_cpu_flush_cache();
if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
/* Normal case: just queue for the out thread to pick it up */
CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
......
......@@ -183,6 +183,7 @@ int fd_psm_change_state(struct fd_peer * peer, int new_state)
TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state));
CHECK_PARAMS( CHECK_PEER(peer) );
fd_cpu_flush_cache();
old = peer->p_hdr.info.runtime.pir_state;
if (old == new_state)
return 0;
......@@ -193,6 +194,7 @@ int fd_psm_change_state(struct fd_peer * peer, int new_state)
peer->p_hdr.info.pi_diamid);
peer->p_hdr.info.runtime.pir_state = new_state;
fd_cpu_flush_cache();
if (old == STATE_OPEN) {
CHECK_FCT( leave_open_state(peer) );
......@@ -250,6 +252,7 @@ void fd_psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
void fd_psm_cleanup(struct fd_peer * peer, int terminate)
{
/* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) {
CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ );
}
......@@ -280,6 +283,7 @@ void cleanup_setstate(void * arg)
struct fd_peer * peer = (struct fd_peer *)arg;
CHECK_PARAMS_DO( CHECK_PEER(peer), return );
peer->p_hdr.info.runtime.pir_state = STATE_ZOMBIE;
fd_cpu_flush_cache();
return;
}
......@@ -305,7 +309,7 @@ static void * p_psm_th( void * arg )
/* The state machine starts in CLOSED state */
peer->p_hdr.info.runtime.pir_state = STATE_CLOSED;
/* Wait that the PSM are authorized to start in the daemon */
CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end );
......@@ -708,6 +712,7 @@ psm_end:
STATE_STR(peer->p_hdr.info.runtime.pir_state),
peer->p_hdr.info.pi_diamid);
pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
fd_cpu_flush_cache();
peer->p_psm = (pthread_t)NULL;
pthread_detach(pthread_self());
return NULL;
......@@ -741,6 +746,7 @@ int fd_psm_terminate(struct fd_peer * peer, char * reason )
TRACE_ENTRY("%p", peer);
CHECK_PARAMS( CHECK_PEER(peer) );
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) {
CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) );
} else {
......
......@@ -278,6 +278,7 @@ int fd_peer_fini()
for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
struct fd_peer * peer = (struct fd_peer *)li;
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state != STATE_ZOMBIE) {
CHECK_FCT_DO( fd_psm_terminate(peer, "REBOOTING"), /* continue */ );
} else {
......@@ -305,6 +306,7 @@ int fd_peer_fini()
CHECK_FCT_DO( pthread_rwlock_wrlock(&fd_g_peers_rw), /* continue */ );
for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) {
struct fd_peer * peer = (struct fd_peer *)li;
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state == STATE_ZOMBIE) {
li = li->prev; /* to avoid breaking the loop */
fd_list_unlink(&peer->p_hdr.chain);
......@@ -461,6 +463,7 @@ int fd_peer_handle_newCER( struct msg ** cer, struct cnxctx ** cnx )
CHECK_FCT_DO( ret = fd_psm_begin(peer), goto out );
} else {
/* Check if the peer is in zombie state */
fd_cpu_flush_cache();
if (peer->p_hdr.info.runtime.pir_state == STATE_ZOMBIE) {
/* Re-activate the peer */
if (peer->p_hdr.info.config.pic_flags.exp)
......
......@@ -814,6 +814,7 @@ static int msg_rt_out(struct msg ** pmsg)
/* Find the peer corresponding to this name */
CHECK_FCT( fd_peer_getbyid( qry_src, (void *) &peer ) );
fd_cpu_flush_cache();
if ((!peer) || (peer->p_hdr.info.runtime.pir_state != STATE_OPEN)) {
TRACE_DEBUG(INFO, "Unable to forward answer message to peer '%s', deleted or not in OPEN state.", qry_src);
fd_msg_dump_walk(INFO, *pmsg);
......@@ -933,6 +934,7 @@ static int msg_rt_out(struct msg ** pmsg)
/* Search for the peer */
CHECK_FCT( fd_peer_getbyid( c->diamid, (void *)&peer ) );
fd_cpu_flush_cache();
if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
/* Send to this one */
CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer, 0), continue );
......
......@@ -86,6 +86,7 @@ static void * demuxer(void * arg)
ASSERT( conn->cc_sctps_data.array );
do {
fd_cpu_flush_cache();
CHECK_FCT_DO( fd_sctp_recvmeta(conn->cc_socket, &strid, &buf, &bufsz, &event, &conn->cc_status), goto fatal );
switch (event) {
case FDEVP_CNX_MSG_RECV:
......@@ -170,6 +171,7 @@ static ssize_t sctps_push(gnutls_transport_ptr_t tr, const void * data, size_t l
TRACE_ENTRY("%p %p %zd", tr, data, len);
CHECK_PARAMS_DO( tr && data, { errno = EINVAL; return -1; } );
fd_cpu_flush_cache();
CHECK_FCT_DO( fd_sctp_sendstr(ctx->parent->cc_socket, ctx->strid, (uint8_t *)data, len, &ctx->parent->cc_status), /* errno is already set */ return -1 );
return len;
......@@ -619,6 +621,7 @@ void fd_sctps_bye(struct cnxctx * conn)
/* End all TLS sessions, in series (not as efficient as paralel, but simpler) */
for (i = 1; i < conn->cc_sctp_para.pairs; i++) {
fd_cpu_flush_cache();
if ( ! (conn->cc_status & CC_STATUS_ERROR)) {
CHECK_GNUTLS_DO( gnutls_bye(conn->cc_sctps_data.array[i].session, GNUTLS_SHUT_WR), fd_cnx_markerror(conn) );
}
......
......@@ -227,7 +227,7 @@ struct peer_info {
struct {
enum peer_state pir_state; /* Current state of the peer in the state machine */
enum peer_state pir_state; /* Current state of the peer in the state machine. fd_cpu_flush_cache() might be useful before reading. */
char * pir_realm; /* The received realm in CER/CEA. */
......@@ -252,6 +252,7 @@ struct peer_info {
struct fd_list pi_endpoints; /* Endpoint(s) of the remote peer (configured, discovered, or advertized). list of struct fd_endpoint. DNS resolved if empty. */
};
struct peer_hdr {
struct fd_list chain; /* List of all the peers, ordered by their Diameter Id */
struct peer_info info; /* The public data */
......
......@@ -568,6 +568,14 @@ static __inline__ int fd_thr_term(pthread_t * th)
return 0;
}
/* Force flushing the cache of a CPU before reading a shared memory area (use only for atomic reads such as int and void*) */
extern pthread_mutex_t fd_cpu_mtx_dummy; /* only for the macro bellow, so that we have reasonably fresh pir_state value when needed */
#define fd_cpu_flush_cache() { \
(void)pthread_mutex_lock(&fd_cpu_mtx_dummy); \
(void)pthread_mutex_unlock(&fd_cpu_mtx_dummy); \
}
/*************
Cancelation cleanup handlers for common objects
*************/
......
......@@ -35,6 +35,9 @@
#include "libfD.h"
/* Only for CPU cache flush */
pthread_mutex_t fd_cpu_mtx_dummy = PTHREAD_MUTEX_INITIALIZER;
/* Initialize library variables and threads */
int fd_lib_init(int support_signals)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment