Commit 352d9fb0 authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Added a lot of TODOs :)

parent 56a5be0c
......@@ -20,6 +20,7 @@ SET(FD_COMMON_SRC
queues.c
peers.c
p_expiry.c
p_out.c
p_psm.c
server.c
tcp.c
......
......@@ -95,14 +95,8 @@ void fd_conf_dump()
fd_log_debug(" Local endpoints ........ : Default (use all available)\n");
} else {
struct fd_list * li = fd_g_config->cnf_endpoints.next;
fd_log_debug(" Local endpoints ........ : ");
while (li != &fd_g_config->cnf_endpoints) {
struct fd_endpoint * ep = (struct fd_endpoint *)li;
if (li != fd_g_config->cnf_endpoints.next) fd_log_debug(" ");
sSA_DUMP_NODE( &ep->sa, NI_NUMERICHOST );
fd_log_debug("\n");
li = li->next;
}
fd_log_debug(" Local endpoints ........ : \n");
fd_ep_dump( 29, &fd_g_config->cnf_endpoints );
}
if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_apps)) {
fd_log_debug(" Local applications ..... : (none)\n");
......
......@@ -108,3 +108,30 @@ int fd_ep_clearflags( struct fd_list * list, uint32_t flags )
return 0;
}
void fd_ep_dump_one( char * prefix, struct fd_endpoint * ep, char * suffix )
{
if (prefix)
fd_log_debug("%s", prefix);
sSA_DUMP_NODE_SERV( &ep->sa, NI_NUMERICHOST | NI_NUMERICSERV );
fd_log_debug(" {%s%s%s%s}",
(ep->flags & EP_FL_CONF) ? "C" : "-",
(ep->flags & EP_FL_DISC) ? "D" : "-",
(ep->flags & EP_FL_ADV) ? "A" : "-",
(ep->flags & EP_FL_LL) ? "L" : "-",
(ep->flags & EP_FL_PRIMARY) ? "P" : "-");
if (suffix)
fd_log_debug("%s", suffix);
}
void fd_ep_dump( int indent, struct fd_list * eps )
{
struct fd_list * li;
for (li = eps->next; li != eps; li = li->next) {
struct fd_endpoint * ep = (struct fd_endpoint *)li;
fd_log_debug("%*s", indent, "");
fd_ep_dump_one( NULL, ep, "\n" );
}
}
......@@ -41,6 +41,11 @@
#include <freeDiameter/freeDiameter-host.h>
#include <freeDiameter/freeDiameter.h>
#ifdef DISABLE_SCTP
#undef IPPROTO_SCTP
#define IPPROTO_SCTP (2 = 4) /* some compilation error to spot the references */
#endif /* DISABLE_SCTP */
/* Timeout for establishing a connection */
#ifndef CNX_TIMEOUT
#define CNX_TIMEOUT 10 /* in seconds */
......@@ -101,7 +106,7 @@ struct fd_peer { /* The "real" definition of the peer structure */
char *p_dbgorig;
/* Chaining in peers sublists */
struct fd_list p_actives; /* list of peers in the STATE_OPEN state -- faster routing creation */
struct fd_list p_actives; /* list of peers in the STATE_OPEN state -- used by routing */
struct fd_list p_expiry; /* list of expiring peers, ordered by their timeout value */
struct timespec p_exp_timer; /* Timestamp where the peer will expire; updated each time activity is seen on the peer (except DW) */
......@@ -123,10 +128,6 @@ struct fd_peer { /* The "real" definition of the peer structure */
pthread_t p_psm;
struct timespec p_psm_timer;
/* Received message queue, and thread managing reception of messages */
struct fifo *p_recv;
pthread_t p_inthr;
/* Outgoing message queue, and thread managing sending the messages */
struct fifo *p_tosend;
pthread_t p_outthr;
......@@ -137,10 +138,13 @@ struct fd_peer { /* The "real" definition of the peer structure */
/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
struct fd_list p_sentreq;
/* connection context: socket, callbacks and so on */
/* connection context: socket and related information */
struct cnxctx *p_cnxctx;
/* Callback on initial connection success / failure */
/* Callback for peer validation after the handshake */
int (*p_cb2)(struct peer_info *);
/* Callback on initial connection success / failure after the peer was added */
void (*p_cb)(struct peer_info *, void *);
void *p_cb_data;
......@@ -172,9 +176,25 @@ enum {
,FDEVP_PSM_TIMEOUT
};
const char * fd_pev_str(int event);
#define CHECK_EVENT( _e ) \
#define CHECK_PEVENT( _e ) \
(((int)(_e) >= FDEVP_DUMP_ALL) && ((int)(_e) <= FDEVP_PSM_TIMEOUT))
/* The following macro is actually called in p_psm.c -- another solution would be to declare it static inline */
#define DECLARE_PEV_STR() \
const char * fd_pev_str(int event) \
{ \
switch (event) { \
case_str(FDEVP_DUMP_ALL); \
case_str(FDEVP_TERMINATE); \
case_str(FDEVP_CNX_MSG_RECV); \
case_str(FDEVP_CNX_ERROR); \
case_str(FDEVP_CNX_EP_CHANGE); \
case_str(FDEVP_CNX_INCOMING); \
case_str(FDEVP_PSM_TIMEOUT); \
} \
TRACE_DEBUG(FULL, "Unknown event : %d", event); \
return "Unknown event"; \
}
const char * fd_pev_str(int event);
/* The data structure for FDEVP_CNX_INCOMING events */
struct cnx_incoming {
......@@ -211,6 +231,16 @@ int fd_psm_begin(struct fd_peer * peer );
int fd_psm_terminate(struct fd_peer * peer );
void fd_psm_abord(struct fd_peer * peer );
/* Peer out */
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer);
int fd_out_start(struct fd_peer * peer);
int fd_out_stop(struct fd_peer * peer);
/* Active peers -- routing process should only ever take the read lock, the write lock is managed by PSMs */
extern struct fd_list fd_g_activ_peers;
extern pthread_rwlock_t fd_g_activ_peers_rw; /* protect the list */
/* Server sockets */
void fd_servers_dump();
int fd_servers_start();
......
/*********************************************************************************************************
* Software License Agreement (BSD License) *
* Author: Sebastien Decugis <sdecugis@nict.go.jp> *
* *
* Copyright (c) 2009, WIDE Project and NICT *
* All rights reserved. *
* *
* Redistribution and use of this software in source and binary forms, with or without modification, are *
* permitted provided that the following conditions are met: *
* *
* * Redistributions of source code must retain the above *
* copyright notice, this list of conditions and the *
* following disclaimer. *
* *
* * Redistributions in binary form must reproduce the above *
* copyright notice, this list of conditions and the *
* following disclaimer in the documentation and/or other *
* materials provided with the distribution. *
* *
* * Neither the name of the WIDE Project or NICT nor the *
* names of its contributors may be used to endorse or *
* promote products derived from this software without *
* specific prior written permission of WIDE Project and *
* NICT. *
* *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
*********************************************************************************************************/
#include "fD.h"
/* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct fd_list * sentreq)
{
TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, sentreq);
TODO("If message is a request");
TODO("Alloc new *hbh");
TODO("Bufferize the message, send it");
TODO("Save in sentreq or free")
return ENOTSUP;
}
/* The code of the "out" thread */
static void * out_thr(void * arg)
{
TODO("Pick next message in peer->p_tosend");
TODO("do_send, log errors");
TODO("In case of cancellation, requeue the message");
return NULL;
error:
TODO(" Send an event to the peer ");
return NULL;
}
/* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */
int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
{
TRACE_ENTRY("%p %p %p", msg, cnx, peer);
CHECK_PARAMS( msg && *msg && (cnx || (peer && peer->p_cnxctx)));
if (peer && (peer->p_hdr.info.pi_state == STATE_OPEN)) {
/* Normal case: just queue for the out thread to pick it up */
CHECK_FCT( fd_fifo_post(peer->p_tosend, msg) );
} else {
uint32_t *hbh = NULL;
/* In other cases, the thread is not running, so we handle the sending directly */
if (peer)
hbh = &peer->p_hbh;
if (!cnx)
cnx = peer->p_cnxctx;
/* Do send the message */
CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sentreq : NULL) );
}
return 0;
}
/* Start the "out" thread that picks messages in p_tosend and send them on p_cnxctx */
int fd_out_start(struct fd_peer * peer)
{
TRACE_ENTRY("%p", peer);
CHECK_PARAMS( CHECK_PEER(peer) && (peer->p_outthr == (pthread_t)NULL) );
CHECK_POSIX( pthread_create(&peer->p_outthr, NULL, out_thr, peer) );
return 0;
}
/* Stop that thread */
int fd_out_stop(struct fd_peer * peer)
{
TRACE_ENTRY("%p", peer);
CHECK_PARAMS( CHECK_PEER(peer) );
CHECK_FCT( fd_thr_term(&peer->p_outthr) );
return 0;
}
......@@ -35,40 +35,18 @@
#include "fD.h"
const char *peer_state_str[] = {
"STATE_NEW"
, "STATE_OPEN"
, "STATE_CLOSED"
, "STATE_CLOSING"
, "STATE_WAITCNXACK"
, "STATE_WAITCNXACK_ELEC"
, "STATE_WAITCEA"
, "STATE_OPEN_HANDSHAKE"
, "STATE_SUSPECT"
, "STATE_REOPEN"
, "STATE_ZOMBIE"
};
/* The actual declaration of peer_state_str */
DECLARE_STATE_STR();
const char * fd_pev_str(int event)
{
switch (event) {
#define case_str( _val )\
case _val : return #_val
case_str(FDEVP_DUMP_ALL);
case_str(FDEVP_TERMINATE);
case_str(FDEVP_CNX_MSG_RECV);
case_str(FDEVP_CNX_ERROR);
case_str(FDEVP_CNX_EP_CHANGE);
case_str(FDEVP_CNX_INCOMING);
case_str(FDEVP_PSM_TIMEOUT);
default:
TRACE_DEBUG(FULL, "Unknown event : %d", event);
return "Unknown event";
}
}
/* Helper for next macro */
#define case_str( _val ) \
case _val : return #_val
DECLARE_PEV_STR();
/************************************************************************/
/* Delayed startup */
/************************************************************************/
static int started = 0;
static pthread_mutex_t started_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t started_cnd = PTHREAD_COND_INITIALIZER;
......@@ -89,13 +67,78 @@ awake:
return 0;
}
/* Cancelation cleanup : set ZOMBIE state in the peer */
void cleanup_state(void * arg)
/* Allow the state machines to start */
int fd_psm_start()
{
struct fd_peer * peer = (struct fd_peer *)arg;
CHECK_PARAMS_DO( CHECK_PEER(peer), return );
peer->p_hdr.info.pi_state = STATE_ZOMBIE;
return;
TRACE_ENTRY("");
CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
started = 1;
CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
return 0;
}
/************************************************************************/
/* Manage the list of active peers */
/************************************************************************/
/* Enter/leave OPEN state */
static int enter_open_state(struct fd_peer * peer)
{
CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
TODO(" insert in fd_g_activ_peers ");
CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
/* Start the thread to handle outgoing messages */
CHECK_FCT( fd_out_start(peer) );
return ENOTSUP;
}
static int leave_open_state(struct fd_peer * peer)
{
TODO("Remove from active list");
/* Stop the "out" thread */
CHECK_FCT( fd_out_stop(peer) );
TODO("Failover pending messages: requeue in global structures");
return ENOTSUP;
}
/************************************************************************/
/* Helpers for state changes */
/************************************************************************/
/* Change state */
static int change_state(struct fd_peer * peer, int new_state)
{
int old;
TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state));
CHECK_PARAMS( CHECK_PEER(peer) );
old = peer->p_hdr.info.pi_state;
if (old == new_state)
return 0;
TRACE_DEBUG(FULL, "'%s'\t-> '%s'\t'%s'",
STATE_STR(old),
STATE_STR(new_state),
peer->p_hdr.info.pi_diamid);
if (old == STATE_OPEN) {
CHECK_FCT( leave_open_state(peer) );
}
peer->p_hdr.info.pi_state = new_state;
if (new_state == STATE_OPEN) {
CHECK_FCT( enter_open_state(peer) );
}
return 0;
}
/* Set timeout timer of next event */
......@@ -127,6 +170,19 @@ static void psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
#endif
}
/************************************************************************/
/* The PSM thread */
/************************************************************************/
/* Cancelation cleanup : set ZOMBIE state in the peer */
void cleanup_state(void * arg)
{
struct fd_peer * peer = (struct fd_peer *)arg;
CHECK_PARAMS_DO( CHECK_PEER(peer), return );
peer->p_hdr.info.pi_state = STATE_ZOMBIE;
return;
}
/* The state machine thread (controler) */
static void * p_psm_th( void * arg )
{
......@@ -170,16 +226,26 @@ psm_loop:
/* Now, the action depends on the current state and the incoming event */
/* The following two states are impossible */
/* The following states are impossible */
ASSERT( peer->p_hdr.info.pi_state != STATE_NEW );
ASSERT( peer->p_hdr.info.pi_state != STATE_ZOMBIE );
ASSERT( peer->p_hdr.info.pi_state != STATE_OPEN_HANDSHAKE ); /* because it exists only between two loops */
/* Purge invalid events */
if (!CHECK_EVENT(event)) {
if (!CHECK_PEVENT(event)) {
TRACE_DEBUG(INFO, "Invalid event received in PSM '%s' : %d", peer->p_hdr.info.pi_diamid, event);
goto psm_loop;
}
/* Call the extension callback if needed */
if (peer->p_cb) {
/* Check if we must call it */
/* */
/* OK */
TODO("Call CB");
TODO("Clear CB");
}
/* Handle the (easy) debug event now */
if (event == FDEVP_DUMP_ALL) {
fd_peer_dump(peer, ANNOYING);
......@@ -208,6 +274,42 @@ psm_loop:
}
}
/* A message was received */
if (event == FDEVP_CNX_MSG_RECV) {
TODO("Parse the buffer into a message");
/* parse_and_get_local_ccode */
TODO("Check if it is a local message (CER, DWR, ...)");
TODO("If not, check we are in OPEN state");
TODO("Update expiry timer if needed");
TODO("Handle the message");
}
/* The connection object is broken */
if (event == FDEVP_CNX_ERROR) {
TODO("Destroy the connection object");
TODO("Mark the error in the peer (pf_cnx_pb)");
TODO("Move to closed state, Requeue all messages to a different connection (failover)");
TODO("If pi_flags.exp, terminate the peer");
}
/* The connection notified a change in endpoints */
if (event == FDEVP_CNX_EP_CHANGE) {
/* Cleanup the remote LL and primary addresses */
CHECK_FCT_DO( fd_ep_filter( &peer->p_hdr.info.pi_endpoints, EP_FL_CONF | EP_FL_DISC | EP_FL_ADV ), /* ignore the error */);
CHECK_FCT_DO( fd_ep_clearflags( &peer->p_hdr.info.pi_endpoints, EP_FL_PRIMARY ), /* ignore the error */);
/* Get the new ones */
CHECK_FCT_DO( fd_cnx_getendpoints(peer->p_cnxctx, NULL, &peer->p_hdr.info.pi_endpoints), /* ignore the error */);
if (TRACE_BOOL(ANNOYING)) {
fd_log_debug("New remote endpoint(s):\n");
fd_ep_dump(6, &peer->p_hdr.info.pi_endpoints);
}
/* Done */
goto psm_loop;
}
/* A new connection was established and CER containing this peer id was received */
if (event == FDEVP_CNX_INCOMING) {
struct cnx_incoming * params = ev_data;
......@@ -234,10 +336,15 @@ psm_loop:
goto psm_loop;
}
/* MSG_RECEIVED: fd_p_expi_update(struct fd_peer * peer ) */
/* If timeout or OPEN : call cb if defined */
/* Default action : the handling has not yet been implemented. */
/* The timeout for the current state has been reached */
if (event == FDEVP_PSM_TIMEOUT) {
switch (peer->p_hdr.info.pi_state) {
}
}
/* Default action : the handling has not yet been implemented. [for debug only] */
TODO("Missing handler in PSM : '%s'\t<-- '%s'", STATE_STR(peer->p_hdr.info.pi_state), fd_pev_str(event));
if (event == FDEVP_PSM_TIMEOUT) {
/* We have not handled timeout in this state, let's postpone next alert */
......@@ -251,8 +358,12 @@ psm_end:
peer->p_psm = (pthread_t)NULL;
pthread_detach(pthread_self());
return NULL;
}
}
/************************************************************************/
/* Functions to control the PSM */
/************************************************************************/
/* Create the PSM thread of one peer structure */
int fd_psm_begin(struct fd_peer * peer )
{
......@@ -282,26 +393,16 @@ int fd_psm_terminate(struct fd_peer * peer )
return 0;
}
/* End the PSM violently */
/* End the PSM & cleanup the peer structure */
void fd_psm_abord(struct fd_peer * peer )
{
TRACE_ENTRY("%p", peer);
TODO("Cancel PSM thread");
TODO("Cancel IN thread");
TODO("Cancel OUT thread");
TODO("Cleanup the peer connection object");
TODO("Cleanup the message queues (requeue)");
TODO("Call p_cb with NULL parameter if needed");
return;
}
/* Allow the state machines to start */
int fd_psm_start()
{
TRACE_ENTRY("");
CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
started = 1;
CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
return 0;
}
......@@ -39,6 +39,14 @@
struct fd_list fd_g_peers = FD_LIST_INITIALIZER(fd_g_peers);
pthread_rwlock_t fd_g_peers_rw = PTHREAD_RWLOCK_INITIALIZER;
/* List of active peers */
struct fd_list fd_g_activ_peers = FD_LIST_INITIALIZER(fd_g_activ_peers); /* peers linked by their p_actives oredered by p_diamid */
pthread_rwlock_t fd_g_activ_peers_rw = PTHREAD_RWLOCK_INITIALIZER;
/* List of validation callbacks (registered with fd_peer_validate_register) */
static struct fd_list validators = FD_LIST_INITIALIZER(validators); /* list items are simple fd_list with "o" pointing to the callback */
static pthread_rwlock_t validators_rw = PTHREAD_RWLOCK_INITIALIZER;
/* Alloc / reinit a peer structure. if *ptr is not NULL, it must already point to a valid struct fd_peer. */
int fd_peer_alloc(struct fd_peer ** ptr)
......@@ -68,7 +76,6 @@ int fd_peer_alloc(struct fd_peer ** ptr)
fd_list_init(&p->p_actives, p);
p->p_hbh = lrand48();
CHECK_FCT( fd_fifo_new(&p->p_events) );
CHECK_FCT( fd_fifo_new(&p->p_recv) );
CHECK_FCT( fd_fifo_new(&p->p_tosend) );
fd_list_init(&p->p_sentreq, p);
......@@ -205,21 +212,12 @@ int fd_peer_free(struct fd_peer ** ptr)
}
CHECK_FCT( fd_fifo_del(&p->p_events) );
CHECK_FCT( fd_thr_term(&p->p_inthr) );
while ( fd_fifo_tryget(p->p_recv, &t) == 0 ) {
struct msg * m = t;
TRACE_DEBUG(FULL, "Found message %p in incoming queue of peer %p being destroyed", m, p);
/* We simply destroy, the remote peer will re-send to someone else...*/
CHECK_FCT(fd_msg_free(m));
}
CHECK_FCT( fd_fifo_del(&p->p_recv) );
CHECK_FCT( fd_thr_term(&p->p_outthr) );
while ( fd_fifo_tryget(p->p_tosend, &t) == 0 ) {
struct msg * m = t;
TRACE_DEBUG(FULL, "Found message %p in outgoing queue of peer %p being destroyed, requeue", m, p);
/* We simply requeue in global, the routing thread will re-handle it. */
CHECK_FCT(fd_fifo_post(fd_g_outgoing, &m));
}
CHECK_FCT( fd_fifo_del(&p->p_tosend) );
......@@ -318,6 +316,15 @@ int fd_peer_fini()
fd_peer_free(&peer);
}
/* Now empty the validators list */
CHECK_FCT_DO( pthread_rwlock_wrlock(&validators_rw), /* continue */ );
while (!FD_IS_LIST_EMPTY( &validators )) {
struct fd_list * v = validators.next;
fd_list_unlink(v);
free(v);
}
CHECK_FCT_DO( pthread_rwlock_unlock(&validators_rw), /* continue */ );
return 0;
}
......@@ -459,16 +466,46 @@ out:
/* Save a callback to accept / reject incoming unknown peers */
int fd_peer_validate_register ( int (*peer_validate)(struct peer_info * /* info */, int * /* auth */, int (**cb2)(struct peer_info *)) )
{
struct fd_list * v;
TRACE_ENTRY("%p", peer_validate);
CHECK_PARAMS(peer_validate);
TODO("...");
return ENOTSUP;
/* Alloc a new entry */
CHECK_MALLOC( v = malloc(sizeof(struct fd_list)) );
fd_list_init( v, peer_validate );
/* Add at the beginning of the list */
CHECK_FCT( pthread_rwlock_wrlock(&validators_rw) );
fd_list_insert_after(&validators, v);
CHECK_FCT( pthread_rwlock_unlock(&validators_rw));