Commit fb9b064d authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Backup for the WE, some warnings remaining

parent 352d9fb0
......@@ -22,6 +22,7 @@ SET(FD_COMMON_SRC
p_expiry.c
p_out.c
p_psm.c
p_sr.c
server.c
tcp.c
)
......
......@@ -92,6 +92,12 @@ int fd_queues_fini(void);
/* Create all the dictionary objects defined in the Diameter base RFC. */
int fd_dict_base_protocol(struct dictionary * dict);
/* Sentinel for the sent requests list */
struct sr_list {
struct fd_list srs;
pthread_mutex_t mtx;
};
/* Peers */
struct fd_peer { /* The "real" definition of the peer structure */
......@@ -136,7 +142,7 @@ struct fd_peer { /* The "real" definition of the peer structure */
uint32_t p_hbh;
/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
struct fd_list p_sentreq;
struct sr_list p_sr;
/* connection context: socket and related information */
struct cnxctx *p_cnxctx;
......@@ -203,12 +209,6 @@ struct cnx_incoming {
int validate; /* The peer is new, it must be validated (by an extension) or error CEA to be sent */
};
/* Structure to store a sent request */
struct sentreq {
struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */
struct msg *req; /* A request that was sent and not yet answered. */
};
/* Functions */
int fd_peer_fini();
......@@ -219,6 +219,7 @@ int fd_peer_free(struct fd_peer ** ptr);
int fd_peer_handle_newCER( struct msg ** cer, struct cnxctx ** cnx );
/* fd_peer_add declared in freeDiameter.h */
int fd_peer_validate( struct fd_peer * peer );
void fd_peer_failover_msg(struct fd_peer * peer);
/* Peer expiry */
int fd_p_expi_init(void);
......@@ -236,6 +237,11 @@ int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer);
int fd_out_start(struct fd_peer * peer);
int fd_out_stop(struct fd_peer * peer);
/* Peer sent requests cache */
int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc);
int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req);
void fd_p_sr_failover(struct sr_list * srlist);
/* Active peers -- routing process should only ever take the read lock, the write lock is managed by PSMs */
extern struct fd_list fd_g_activ_peers;
extern pthread_rwlock_t fd_g_activ_peers_rw; /* protect the list */
......
......@@ -264,3 +264,43 @@ int fd_msg_send ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void
return 0;
}
/* Parse a message against our dictionary, and in case of error log and eventually build the error reply -- returns the parsing status */
int fd_msg_parse_or_error( struct msg ** msg )
{
int ret = 0;
struct msg * m;
struct msg_hdr * hdr = NULL;
struct fd_pei pei;
TRACE_ENTRY("%p", msg);
CHECK_PARAMS(msg && *msg);
m = *msg;
/* Parse the message against our dictionary */
ret = fd_msg_parse_rules ( m, fd_g_config->cnf_dict, &pei);
if (ret != EBADMSG)
return ret;
fd_log_debug("The following message does not comply to the dictionary and rules (%s):\n", pei.pei_errcode);
fd_msg_dump_walk(NONE, m);
/* Now create an answer error if the message is a query */
CHECK_FCT( fd_msg_hdr(m, &hdr) );
if (hdr->msg_flags & CMD_FLAG_REQUEST) {
/* Create the error message */
CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, msg, pei.pei_protoerr ? MSGFL_ANSW_ERROR : 0 ) );
/* Set the error code */
CHECK_FCT( fd_msg_rescode_set(*msg, pei.pei_errcode, pei.pei_message, pei.pei_avp, 1 ) );
} else {
/* Just discard */
CHECK_FCT( fd_msg_free( m ) );
*msg = NULL;
}
return ret;
}
......@@ -36,29 +36,100 @@
#include "fD.h"
/* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct fd_list * sentreq)
static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
{
TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, sentreq);
struct msg_hdr * hdr;
int msg_is_a_req;
uint8_t * buf;
size_t sz;
int ret;
TODO("If message is a request");
TODO("Alloc new *hbh");
TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl);
TODO("Bufferize the message, send it");
/* Retrieve the message header */
CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
TODO("Save in sentreq or free")
msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
if (msg_is_a_req) {
CHECK_PARAMS(hbh && srl);
/* Alloc the hop-by-hop id and increment the value for next message */
hdr->msg_hbhid = *hbh;
*hbh = hdr->msg_hbhid + 1;
}
/* Create the message buffer */
CHECK_FCT(fd_msg_bufferize( *msg, &buf, &sz ));
/* Send the message */
pthread_cleanup_push( free, buf );
CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), { free(buf); return ret; } );
pthread_cleanup_pop(1);
/* Save a request */
if (msg_is_a_req) {
CHECK_FCT_DO( fd_p_sr_store(srl, msg, &hdr->msg_hbhid),
{
fd_log_debug("The following request was sent successfully but not saved locally:\n" );
fd_log_debug(" (as a result the matching answer will be discarded)\n" );
fd_msg_dump_walk(NONE, *msg);
} );
}
return ENOTSUP;
/* Free answers and unsaved requests */
if (*msg) {
CHECK_FCT( fd_msg_free(*msg) );
*msg = NULL;
}
return 0;
}
static void cleanup_requeue(void * arg)
{
struct msg *msg = arg;
CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */));
}
/* The code of the "out" thread */
static void * out_thr(void * arg)
{
TODO("Pick next message in peer->p_tosend");
TODO("do_send, log errors");
TODO("In case of cancellation, requeue the message");
return NULL;
struct fd_peer * peer = arg;
ASSERT( CHECK_PEER(peer) );
/* Set the thread name */
{
char buf[48];
sprintf(buf, "OUT/%.*s", sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
fd_log_threadname ( buf );
}
/* Loop until cancelation */
while (1) {
struct msg * msg;
/* Retrieve next message to send */
CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
/* Now if we are cancelled, we requeue this message */
pthread_cleanup_push(cleanup_requeue, msg);
/* Send the message, log any error */
CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
{
fd_log_debug("An error occurred while sending this message, it is lost:\n");
fd_msg_dump_walk(NONE, msg);
fd_msg_free(msg);
} );
/* Loop */
pthread_cleanup_pop(0);
}
error:
TODO(" Send an event to the peer ");
/* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
return NULL;
}
......@@ -83,7 +154,7 @@ int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
cnx = peer->p_cnxctx;
/* Do send the message */
CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sentreq : NULL) );
CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL) );
}
return 0;
......
......@@ -83,32 +83,64 @@ int fd_psm_start()
/* Manage the list of active peers */
/************************************************************************/
/* Enter/leave OPEN state */
static int enter_open_state(struct fd_peer * peer)
{
struct fd_list * li;
CHECK_PARAMS( FD_IS_LIST_EMPTY(&peer->p_actives) );
/* Callback registered by the credential validator (fd_peer_validate_register) */
if (peer->p_cb2) {
CHECK_FCT_DO( (*peer->p_cb2)(&peer->p_hdr.info),
{
TRACE_DEBUG(FULL, "Validation failed, moving to state CLOSING");
peer->p_hdr.info.pi_state = STATE_CLOSING;
fd_psm_terminate(peer);
} );
peer->p_cb2 = NULL;
return 0;
}
/* Insert in the active peers list */
CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
TODO(" insert in fd_g_activ_peers ");
for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
struct fd_peer * next_p = (struct fd_peer *)li->o;
int cmp = strcmp(peer->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamid);
if (cmp < 0)
break;
}
fd_list_insert_before(li, &peer->p_actives);
CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
/* Callback registered when the peer was added, by fd_peer_add */
if (peer->p_cb) {
TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid);
(*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data);
peer->p_cb = NULL;
peer->p_cb_data = NULL;
}
/* Start the thread to handle outgoing messages */
CHECK_FCT( fd_out_start(peer) );
return ENOTSUP;
return 0;
}
static int leave_open_state(struct fd_peer * peer)
{
TODO("Remove from active list");
/* Remove from active peers list */
CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
fd_list_unlink( &peer->p_actives );
CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
/* Stop the "out" thread */
CHECK_FCT( fd_out_stop(peer) );
TODO("Failover pending messages: requeue in global structures");
/* Failover the messages */
fd_peer_failover_msg(peer);
return ENOTSUP;
return 0;
}
/************************************************************************/
/* Helpers for state changes */
/************************************************************************/
......@@ -164,7 +196,7 @@ static void psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
peer->p_psm_timer.tv_sec += delay;
#if 0
#ifdef SLOW_PSM
/* temporary for debug */
peer->p_psm_timer.tv_sec += 10;
#endif
......@@ -187,7 +219,7 @@ void cleanup_state(void * arg)
static void * p_psm_th( void * arg )
{
struct fd_peer * peer = (struct fd_peer *)arg;
int created_started = started;
int created_started = started ? 1 : 0;
int event;
size_t ev_sz;
void * ev_data;
......@@ -213,7 +245,7 @@ static void * p_psm_th( void * arg )
if (peer->p_flags.pf_responder) {
psm_next_timeout(peer, 0, INCNX_TIMEOUT);
} else {
psm_next_timeout(peer, created_started ? 0 : 1, 0);
psm_next_timeout(peer, created_started, 0);
}
psm_loop:
......@@ -237,15 +269,6 @@ psm_loop:
goto psm_loop;
}
/* Call the extension callback if needed */
if (peer->p_cb) {
/* Check if we must call it */
/* */
/* OK */
TODO("Call CB");
TODO("Clear CB");
}
/* Handle the (easy) debug event now */
if (event == FDEVP_DUMP_ALL) {
fd_peer_dump(peer, ANNOYING);
......@@ -276,8 +299,66 @@ psm_loop:
/* A message was received */
if (event == FDEVP_CNX_MSG_RECV) {
TODO("Parse the buffer into a message");
/* parse_and_get_local_ccode */
struct msg * msg = NULL;
struct msg_hdr * hdr;
/* Parse the received buffer */
CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg),
{
fd_log_debug("Received invalid data from peer '%s', closing the connection\n", peer->p_hdr.info.pi_diamid);
CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_end );
goto psm_loop;
} );
TRACE_DEBUG(FULL, "Received this message from '%s':", peer->p_hdr.info.pi_diamid);
fd_msg_dump_walk(FULL, msg);
/* Extract the header */
CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
/* If it is an answer, associate with the request */
if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
struct msg * req;
/* Search matching request (same hbhid) */
CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
if (req == NULL) {
fd_log_debug("Received a Diameter answer message with no corresponding sent request, discarding...\n");
fd_msg_dump_walk(NONE, msg);
fd_msg_free(msg);
goto psm_loop;
}
/* Associate */
CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
}
/* We received a valid message, update the expiry timer */
CHECK_FCT_DO( fd_p_expi_update(peer), goto psm_end );
/* Now handle non-link-local messages */
if (fd_msg_is_routable(msg)) {
/* If we are not in OPEN state, discard the message */
if (peer->p_hdr.info.pi_state != STATE_OPEN) {
fd_log_debug("Received a routable message while not in OPEN state from peer '%s', discarded.\n", peer->p_hdr.info.pi_diamid);
fd_msg_dump_walk(NONE, msg);
fd_msg_free(msg);
} else {
/* Set the message source and add the Route-Record */
CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, 1, fd_g_config->cnf_dict ), goto psm_end);
/* Requeue to the global incoming queue */
CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end );
/* Update the peer timer */
if (!peer->p_flags.pf_dw_pending) {
psm_next_timeout(peer, 1, peer->p_hdr.info.pi_twtimer ?: fd_g_config->cnf_timer_tw);
}
}
goto psm_loop;
}
/* Link-local message: They must be understood by our dictionary */
TODO("Check if it is a local message (CER, DWR, ...)");
TODO("If not, check we are in OPEN state");
TODO("Update expiry timer if needed");
......@@ -318,6 +399,7 @@ psm_loop:
switch (peer->p_hdr.info.pi_state) {
case STATE_CLOSED:
TODO("Handle the CER, validate the peer if needed (and set expiry), set the alt_fifo in the connection, reply a CEA, eventually handshake, move to OPEN or REOPEN state");
/* In case of error : DIAMETER_UNKNOWN_PEER */
break;
case STATE_WAITCNXACK:
......@@ -352,7 +434,7 @@ psm_loop:
}
goto psm_loop;
psm_end:
pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
peer->p_psm = (pthread_t)NULL;
......@@ -397,12 +479,25 @@ int fd_psm_terminate(struct fd_peer * peer )
void fd_psm_abord(struct fd_peer * peer )
{
TRACE_ENTRY("%p", peer);
TODO("Cancel PSM thread");
TODO("Cancel OUT thread");
TODO("Cleanup the peer connection object");
TODO("Cleanup the message queues (requeue)");
TODO("Call p_cb with NULL parameter if needed");
/* Cancel PSM thread */
CHECK_FCT_DO( fd_thr_term(&peer->p_psm), /* continue */ );
/* Cancel the OUT thread */
CHECK_FCT_DO( fd_out_stop(peer), /* continue */ );
/* Cleanup the connection */
if (peer->p_cnxctx) {
fd_cnx_destroy(peer->p_cnxctx);
}
/* Failover the messages */
fd_peer_failover_msg(peer);
/* Empty the events list, this might leak some memory, but we only do it on exit, so... */
fd_event_destroy(&peer->p_events, free);
/* More cleanups are performed in fd_peer_free */
return;
}
/*********************************************************************************************************
* Software License Agreement (BSD License) *
* Author: Sebastien Decugis <sdecugis@nict.go.jp> *
* *
* Copyright (c) 2009, WIDE Project and NICT *
* All rights reserved. *
* *
* Redistribution and use of this software in source and binary forms, with or without modification, are *
* permitted provided that the following conditions are met: *
* *
* * Redistributions of source code must retain the above *
* copyright notice, this list of conditions and the *
* following disclaimer. *
* *
* * Redistributions in binary form must reproduce the above *
* copyright notice, this list of conditions and the *
* following disclaimer in the documentation and/or other *
* materials provided with the distribution. *
* *
* * Neither the name of the WIDE Project or NICT nor the *
* names of its contributors may be used to endorse or *
* promote products derived from this software without *
* specific prior written permission of WIDE Project and *
* NICT. *
* *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
*********************************************************************************************************/
#include "fD.h"
/* Structure to store a sent request */
struct sentreq {
struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */
struct msg *req; /* A request that was sent and not yet answered. */
};
/* Find an element in the list, or the following one */
static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
{
struct fd_list * li;
*match = 0;
for (li = srlist->next; li != srlist; li = li->next) {
uint32_t * nexthbh = li->o;
if (*nexthbh < hbh)
continue;
if (*nexthbh == hbh)
*match = 1;
break;
}
return li;
}
/* Store a new sent request */
int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc)
{
struct sentreq * sr;
struct fd_list * next;
int match;
TRACE_ENTRY("%p %p %p", srlist, req, hbhloc);
CHECK_PARAMS(srlist && req && *req && hbhloc);
CHECK_MALLOC( sr = malloc(sizeof(struct sentreq)) );
memset(sr, 0, sizeof(struct sentreq));
fd_list_init(&sr->chain, hbhloc);
sr->req = *req;
/* Search the place in the list */
CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
next = find_or_next(&srlist->srs, *hbhloc, &match);
if (match) {
TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
free(sr);
CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
return EINVAL;
}
/* Save in the list */
*req = NULL;
fd_list_insert_before(next, &sr->chain);
CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
return 0;
}
/* Fetch a request by hbh */
int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req)
{
struct sentreq * sr;
int match;
TRACE_ENTRY("%p %x %p", srlist, hbh, req);
CHECK_PARAMS(srlist && req);
/* Search the request in the list */
CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
if (!match) {
TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id");
*req = NULL;
} else {
/* Unlink */
fd_list_unlink(&sr->chain);
*req = sr->req;
free(sr);
}
CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
/* Done */
return 0;
}
/* Failover requests (free or requeue routables) */
void fd_p_sr_failover(struct sr_list * srlist)
{
CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), /* continue anyway */ );
while (!FD_IS_LIST_EMPTY(&srlist->srs)) {
struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
fd_list_unlink(&sr->chain);
if (fd_msg_is_routable(sr->req)) {
struct msg_hdr * hdr = NULL;
/* Set the 'T' flag */
CHECK_FCT_DO(fd_msg_hdr(sr->req, &hdr), /* continue */);
if (hdr)
hdr->msg_flags |= CMD_FLAG_RETRANSMIT;
/* Requeue for sending to another peer */
CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &sr->req),
CHECK_FCT_DO(fd_msg_free(sr->req), /* What can we do more? */));
} else {
/* Just free the request... */
CHECK_FCT_DO(fd_msg_free(sr->req), /* Ignore */);
}
free(sr);
}
CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
}
......@@ -77,7 +77,8 @@ int fd_peer_alloc(struct fd_peer ** ptr)
p->p_hbh = lrand48();
CHECK_FCT( fd_fifo_new(&p->p_events) );
CHECK_FCT( fd_fifo_new(&p->p_tosend) );
fd_list_init(&p->p_sentreq, p);
fd_list_init(&p->p_sr.srs, p);
CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) );
return 0;
}
......@@ -179,6 +180,27 @@ out:
free(__li); \
}
/* Empty the lists of p_tosend and p_sentreq messages */
void fd_peer_failover_msg(struct fd_peer * peer)
{
struct msg *m;
TRACE_ENTRY("%p", peer);
CHECK_PARAMS_DO(CHECK_PEER(peer), return);
/* Requeue all messages in the "out" queue */