Commit 2e0e60e1 authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Broadcast CEA over all streams to avoid possible race condition

parent 477ff48d
......@@ -1367,9 +1367,9 @@ static int send_simple(struct cnxctx * conn, unsigned char * buf, size_t len)
}
/* Send a message -- this is synchronous -- and we assume it's never called by several threads at the same time, so we don't protect. */
int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered)
int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags)
{
TRACE_ENTRY("%p %p %zd %i", conn, buf, len, ordered);
TRACE_ENTRY("%p %p %zd %x", conn, buf, len, flags);
CHECK_PARAMS(conn && (conn->cc_socket > 0) && (! (conn->cc_status & CC_STATUS_ERROR)) && buf && len);
......@@ -1382,32 +1382,64 @@ int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int order
#ifndef DISABLE_SCTP
case IPPROTO_SCTP: {
int multistr = 0;
if ((!ordered) && (conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1))) {
/* Update the id of the stream we will send this message on */
conn->cc_sctp_para.next += 1;
conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
multistr = 1;
if (flags & FD_CNX_BROADCAST) {
/* Send the buffer over all other streams */
uint16_t str;
if (conn->cc_status & CC_STATUS_TLS) {
for ( str=1; str < conn->cc_sctp_para.pairs; str++) {
ssize_t ret;
size_t sent = 0;
do {
CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[str].session, buf + sent, len - sent), );
if (ret <= 0)
return ENOTCONN;
sent += ret;
} while ( sent < len );
}
} else {
for ( str=1; str < conn->cc_sctp_para.str_out; str++) {
CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, str, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
}
}
/* Set the ORDERED flag also so that it is sent over stream 0 as well */
flags &= FD_CNX_ORDERED;
}
if ((!multistr) || (conn->cc_sctp_para.next == 0)) {
if (flags & FD_CNX_ORDERED) {
/* We send over stream #0 */
CHECK_FCT( send_simple(conn, buf, len) );
} else {
if (!(conn->cc_status & CC_STATUS_TLS)) {
CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
/* Default case : no flag specified */
int another_str = 0; /* do we send over stream #0 ? */
if ((conn->cc_sctp_para.str_out > 1) && ((! (conn->cc_status & CC_STATUS_TLS)) || (conn->cc_sctp_para.pairs > 1))) {
/* Update the id of the stream we will send this message over */
conn->cc_sctp_para.next += 1;
conn->cc_sctp_para.next %= ((conn->cc_status & CC_STATUS_TLS) ? conn->cc_sctp_para.pairs : conn->cc_sctp_para.str_out);
another_str = (conn->cc_sctp_para.next ? 1 : 0);
}
if ( ! another_str ) {
CHECK_FCT( send_simple(conn, buf, len) );
} else {
/* push the record to the appropriate session */
ssize_t ret;
size_t sent = 0;
ASSERT(conn->cc_sctps_data.array != NULL);
do {
CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), );
if (ret <= 0)
return ENOTCONN;
sent += ret;
} while ( sent < len );
if (!(conn->cc_status & CC_STATUS_TLS)) {
CHECK_FCT_DO( fd_sctp_sendstr(conn->cc_socket, conn->cc_sctp_para.next, buf, len, &conn->cc_status), { fd_cnx_markerror(conn); return ENOTCONN; } );
} else {
/* push the record to the appropriate session */
ssize_t ret;
size_t sent = 0;
ASSERT(conn->cc_sctps_data.array != NULL);
do {
CHECK_GNUTLS_DO( ret = fd_tls_send_handle_error(conn, conn->cc_sctps_data.array[conn->cc_sctp_para.next].session, buf + sent, len - sent), );
if (ret <= 0)
return ENOTCONN;
sent += ret;
} while ( sent < len );
}
}
}
}
......
......@@ -239,7 +239,7 @@ const char * fd_pev_str(int event) \
}
const char * fd_pev_str(int event);
/* The data structure for FDEVP_CNX_INCOMING events */
/* The data structure for FDEVP_CNX_INCOMING event */
struct cnx_incoming {
struct msg * cer; /* the CER message received on this connection */
struct cnxctx * cnx; /* The connection context */
......@@ -273,7 +273,7 @@ int fd_psm_change_state(struct fd_peer * peer, int new_state);
void fd_psm_cleanup(struct fd_peer * peer, int terminate);
/* Peer out */
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer);
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags);
int fd_out_start(struct fd_peer * peer);
int fd_out_stop(struct fd_peer * peer);
......@@ -326,8 +326,11 @@ int fd_cnx_getendpoints(struct cnxctx * conn, struct fd_list * local
char * fd_cnx_getremoteid(struct cnxctx * conn);
int fd_cnx_receive(struct cnxctx * conn, struct timespec * timeout, unsigned char **buf, size_t * len);
int fd_cnx_recv_setaltfifo(struct cnxctx * conn, struct fifo * alt_fifo); /* send FDEVP_CNX_MSG_RECV event to the fifo list */
int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, int ordered);
int fd_cnx_send(struct cnxctx * conn, unsigned char * buf, size_t len, uint32_t flags);
void fd_cnx_destroy(struct cnxctx * conn);
/* Flags for the fd_cnx_send function : */
#define FD_CNX_ORDERED (1 << 0) /* All messages sent with this flag set will be delivered in the same order. No guarantee on other messages */
#define FD_CNX_BROADCAST (1 << 1) /* The message is sent over all stream pairs, in case of SCTP. No effect on TCP */
#endif /* _FD_H */
......@@ -587,7 +587,7 @@ static void receiver_reject(struct cnxctx * recv_cnx, struct msg ** cer, char *
/* Create and send the CEA with appropriate error code */
CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, cer, MSGFL_ANSW_ERROR ), goto destroy );
CHECK_FCT_DO( fd_msg_rescode_set(*cer, rescode, errormsg, NULL, 1 ), goto destroy );
CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL), goto destroy );
CHECK_FCT_DO( fd_out_send(cer, recv_cnx, NULL, FD_CNX_ORDERED), goto destroy );
/* And now destroy this connection */
destroy:
......@@ -605,7 +605,7 @@ int fd_p_ce_handle_newcnx(struct fd_peer * peer, struct cnxctx * initiator)
/* Send CER on the new connection */
CHECK_FCT( create_CER(peer, initiator, &cer) );
CHECK_FCT( fd_out_send(&cer, initiator, peer) );
CHECK_FCT( fd_out_send(&cer, initiator, peer, FD_CNX_ORDERED) );
/* Are we doing an election ? */
if (peer->p_hdr.info.runtime.pir_state == STATE_WAITCNXACK_ELEC) {
......@@ -652,7 +652,7 @@ int fd_p_ce_msgrcv(struct msg ** msg, int req, struct fd_peer * peer)
CHECK_FCT( fd_msg_rescode_set(*msg, "DIAMETER_COMMAND_UNSUPPORTED", "No CER allowed in current state", NULL, 1 ) );
/* msg now contains an answer message to send back */
CHECK_FCT_DO( fd_out_send(msg, NULL, peer), /* In case of error the message has already been dumped */ );
CHECK_FCT_DO( fd_out_send(msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
}
/* If the state is not WAITCEA, just discard the message */
......@@ -812,8 +812,7 @@ int fd_p_ce_process_receiver(struct fd_peer * peer)
CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, 0 ) );
CHECK_FCT( fd_msg_rescode_set(msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) );
CHECK_FCT( add_CE_info(msg, peer->p_cnxctx, isi & PI_SEC_TLS_OLD, isi & PI_SEC_NONE) );
CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer) );
TODO("In case of SCTP, broadcast the CEA over all streams so that further messages cannot be delivered before the CEA?");
CHECK_FCT( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_BROADCAST) ); /* Broadcast in order to avoid further messages sent over a different stream be delivered first... */
/* Handshake if needed */
if (isi & PI_SEC_TLS_OLD) {
......@@ -866,7 +865,7 @@ error_abort:
CHECK_FCT( fd_msg_rescode_set(msg, ec, NULL, NULL, 1 ) );
/* msg now contains an answer message to send back */
CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), /* In case of error the message has already been dumped */ );
CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
}
cleanup:
......
......@@ -86,12 +86,12 @@ int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer)
CHECK_FCT( fd_dict_search( fd_g_config->cnf_dict, DICT_ENUMVAL, ENUMVAL_BY_STRUCT, &er, &dictobj, 0 ) );
if (dictobj) {
CHECK_FCT( fd_dict_getval( dictobj, &er.search ) );
fd_log_debug("Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name);
TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with cause: %s\n", peer->p_hdr.info.pi_diamid, er.search.enum_name);
} else {
fd_log_debug("Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC);
TRACE_DEBUG(INFO, "Peer '%s' sent a DPR with unknown cause: %u\n", peer->p_hdr.info.pi_diamid, peer->p_hdr.info.runtime.pir_lastDC);
}
} else {
fd_log_debug("Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid);
TRACE_DEBUG(INFO, "Peer '%s' sent a DPR without Disconnect-Cause AVP\n", peer->p_hdr.info.pi_diamid);
}
}
......@@ -103,7 +103,7 @@ int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer)
CHECK_FCT( fd_psm_change_state(peer, STATE_CLOSING) );
/* Now send the DPA */
CHECK_FCT( fd_out_send( msg, NULL, peer) );
CHECK_FCT( fd_out_send( msg, NULL, peer, FD_CNX_ORDERED) );
/* Move to CLOSED state */
fd_psm_cleanup(peer, 0);
......@@ -114,7 +114,7 @@ int fd_p_dp_handle(struct msg ** msg, int req, struct fd_peer * peer)
} else {
/* We received a DPA */
if (peer->p_hdr.info.runtime.pir_state != STATE_CLOSING) {
TRACE_DEBUG(INFO, "Ignore DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state));
TRACE_DEBUG(INFO, "Ignoring DPA received in state %s", STATE_STR(peer->p_hdr.info.runtime.pir_state));
}
/* In theory, we should control the Result-Code AVP. But since we will not go back to OPEN state here anyway, let's skip it */
......@@ -167,7 +167,7 @@ int fd_p_dp_initiate(struct fd_peer * peer, char * reason)
fd_psm_next_timeout(peer, 0, DPR_TIMEOUT);
/* Now send the DPR message */
CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* ignore since we are on timeout anyway */ );
CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* ignore since we are on timeout anyway */ );
return 0;
}
......@@ -75,7 +75,7 @@ static int send_DWR(struct fd_peer * peer)
CHECK_FCT( fd_msg_add_origin ( msg, 1 ) );
/* Now send this message */
CHECK_FCT( fd_out_send(&msg, NULL, peer) );
CHECK_FCT( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED) );
/* And mark the pending DW */
peer->p_flags.pf_dw_pending = 1;
......@@ -98,7 +98,7 @@ int fd_p_dw_handle(struct msg ** msg, int req, struct fd_peer * peer)
CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, 0 ) );
CHECK_FCT( fd_msg_rescode_set( *msg, "DIAMETER_SUCCESS", NULL, NULL, 0 ) );
CHECK_FCT( fd_msg_add_origin ( *msg, 1 ) );
CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer) );
CHECK_FCT( fd_out_send( msg, peer->p_cnxctx, peer, FD_CNX_ORDERED) );
} else {
/* Just discard the DWA */
......
......@@ -36,16 +36,16 @@
#include "fD.h"
/* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
static int do_send(struct msg ** msg, uint32_t flags, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
{
struct msg_hdr * hdr;
int msg_is_a_req, msg_is_appl;
int msg_is_a_req;
uint8_t * buf;
size_t sz;
int ret;
uint32_t bkp_hbh = 0;
TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl);
TRACE_ENTRY("%p %x %p %p %p", msg, flags, cnx, hbh, srl);
/* Retrieve the message header */
CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
......@@ -59,8 +59,6 @@ static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struc
*hbh = hdr->msg_hbhid + 1;
}
msg_is_appl = fd_msg_is_routable(*msg);
/* Log the message */
if (TRACE_BOOL(FULL)) {
CHECK_FCT_DO( fd_msg_update_length(*msg), /* continue */ );
......@@ -78,7 +76,7 @@ static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struc
}
/* Send the message */
CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, !msg_is_appl), { free(buf); return ret; } );
CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz, flags), { free(buf); return ret; } );
pthread_cleanup_pop(1);
/* Free remaining messages (i.e. answers) */
......@@ -121,7 +119,7 @@ static void * out_thr(void * arg)
pthread_cleanup_push(cleanup_requeue, msg);
/* Send the message, log any error */
CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
CHECK_FCT_DO( do_send(&msg, 0, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
{
fd_log_debug("An error occurred while sending this message, it is lost:\n");
fd_msg_dump_walk(NONE, msg);
......@@ -138,10 +136,10 @@ error:
return NULL;
}
/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided. Flags are valid only for direct sending, not through thread (unused) */
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer, uint32_t flags)
{
TRACE_ENTRY("%p %p %p", msg, cnx, peer);
TRACE_ENTRY("%p %p %p %x", msg, cnx, peer, flags);
CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
......@@ -159,7 +157,7 @@ int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
cnx = peer->p_cnxctx;
/* Do send the message */
CHECK_FCT_DO( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL),
CHECK_FCT_DO( do_send(msg, flags, cnx, hbh, peer ? &peer->p_sr : NULL),
{
fd_log_debug("An error occurred while sending this message, it is lost:\n");
fd_msg_dump_walk(NONE, *msg);
......
......@@ -458,7 +458,7 @@ psm_loop:
} else {
if (msg) {
/* Send the error back to the peer */
CHECK_FCT_DO( fd_out_send(&msg, NULL, peer), /* In case of error the message has already been dumped */ );
CHECK_FCT_DO( fd_out_send(&msg, NULL, peer, FD_CNX_ORDERED), /* In case of error the message has already been dumped */ );
if (msg) {
CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
}
......@@ -499,7 +499,7 @@ psm_loop:
CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_INVALID_HDR_BITS", NULL, NULL, 1 ), break );
/* Send the answer */
CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer), break );
CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, FD_CNX_ORDERED), break );
} while (0);
} else {
/* We did ASK for it ??? */
......
......@@ -425,7 +425,7 @@ static int return_error(struct msg ** pmsg, char * error_code, char * error_mess
if (is_loc) {
CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
} else {
CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) );
}
/* Done */
......@@ -825,7 +825,7 @@ static int msg_rt_out(struct msg ** pmsg)
hdr->msg_hbhid = qry_hdr->msg_hbhid;
/* Push the message into this peer */
CHECK_FCT( fd_out_send(pmsg, NULL, peer) );
CHECK_FCT( fd_out_send(pmsg, NULL, peer, 0) );
/* We're done with this answer */
return 0;
......@@ -933,7 +933,7 @@ static int msg_rt_out(struct msg ** pmsg)
if (peer && (peer->p_hdr.info.runtime.pir_state == STATE_OPEN)) {
/* Send to this one */
CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer), continue );
CHECK_FCT_DO( fd_out_send(pmsg, NULL, peer, 0), continue );
/* If the sending was successful */
break;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment