Commit 511a7a5a authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Some progress on peers module

parent 384d3a67
......@@ -49,15 +49,7 @@ struct fd_ext_info {
};
/* list of extensions */
static struct fd_list ext_list;
/* Initialize the module */
int fd_ext_init()
{
TRACE_ENTRY();
fd_list_init(&ext_list, NULL);
return 0;
}
static struct fd_list ext_list = FD_LIST_INITIALIZER(ext_list);
/* Add new extension */
int fd_ext_add( char * filename, char * conffile )
......
......@@ -41,6 +41,26 @@
#include <freeDiameter/freeDiameter-host.h>
#include <freeDiameter/freeDiameter.h>
/* Timeout for establishing a connection */
#ifndef CNX_TIMEOUT
#define CNX_TIMEOUT 10 /* in seconds */
#endif /* CNX_TIMEOUT */
/* Timeout for receiving a CER after incoming connection is established */
#ifndef INCNX_TIMEOUT
#define INCNX_TIMEOUT 20 /* in seconds */
#endif /* INCNX_TIMEOUT */
/* Timeout for receiving a CEA after CER is sent */
#ifndef CEA_TIMEOUT
#define CEA_TIMEOUT 10 /* in seconds */
#endif /* CEA_TIMEOUT */
/* The timeout value to wait for answer to a DPR */
#ifndef DPR_TIMEOUT
#define DPR_TIMEOUT 15 /* in seconds */
#endif /* DPR_TIMEOUT */
/* Configuration */
int fd_conf_init();
void fd_conf_dump();
......@@ -48,7 +68,6 @@ int fd_conf_parse();
int fddparse(struct fd_config * conf); /* yacc generated */
/* Extensions */
int fd_ext_init();
int fd_ext_add( char * filename, char * conffile );
int fd_ext_load();
void fd_ext_dump(void);
......@@ -81,18 +100,10 @@ struct fd_peer { /* The "real" definition of the peer structure */
/* Origin of this peer object, for debug */
char *p_dbgorig;
/* Mutex that protect this peer structure */
pthread_mutex_t p_mtx;
/* Reference counter -- freed only when this reaches 0 */
unsigned p_refcount;
/* Chaining in peers sublists */
struct fd_list p_expiry; /* list of expiring peers, ordered by their timeout value */
struct fd_list p_actives; /* list of peers in the STATE_OPEN state -- faster routing creation */
/* The next hop-by-hop id value for the link */
uint32_t p_hbh;
struct fd_list p_expiry; /* list of expiring peers, ordered by their timeout value */
struct timespec p_exp_timer; /* Timestamp where the peer will expire; updated each time activity is seen on the peer (except DW) */
/* Some flags influencing the peer state machine */
struct {
......@@ -108,7 +119,7 @@ struct fd_peer { /* The "real" definition of the peer structure */
} p_flags;
/* The events queue, peer state machine thread, timer for states timeouts */
struct fifo *p_events;
struct fifo *p_events; /* The mutex of this FIFO list protects also the state and timer information */
pthread_t p_psm;
struct timespec p_psm_timer;
......@@ -120,10 +131,13 @@ struct fd_peer { /* The "real" definition of the peer structure */
struct fifo *p_tosend;
pthread_t p_outthr;
/* The next hop-by-hop id value for the link, only read & modified by p_outthr */
uint32_t p_hbh;
/* Sent requests (for fallback), list of struct sentreq ordered by hbh */
struct fd_list p_sentreq;
/* connection context: socket & other metadata */
/* connection context: socket, callbacks and so on */
struct cnxctx *p_cnxctx;
/* Callback on initial connection success / failure */
......@@ -144,7 +158,11 @@ enum {
/* A message was received in the peer */
,FDEVP_MSG_INCOMING
/* The PSM state is expired */
,FDEVP_PSM_TIMEOUT
};
const char * fd_pev_str(int event);
/* Structure to store a sent request */
struct sentreq {
......@@ -153,17 +171,14 @@ struct sentreq {
};
/* Functions */
int fd_peer_init();
int fd_peer_fini();
void fd_peer_dump_list(int details);
/* fd_peer_add declared in freeDiameter.h */
int fd_peer_rc_decr(struct fd_peer **ptr, int locked);
/* Peer expiry */
int fd_p_expi_init(void);
int fd_p_expi_fini(void);
int fd_p_expi_update(struct fd_peer * peer, int locked );
int fd_p_expi_unlink(struct fd_peer * peer, int locked );
int fd_p_expi_update(struct fd_peer * peer );
/* Peer state machine */
int fd_psm_start();
......
......@@ -314,6 +314,8 @@ extconf: /* empty */
connpeer: {
memset(&fddpi, 0, sizeof(fddpi));
fd_list_init( &fddpi.pi_endpoints, NULL );
fd_list_init( &fddpi.pi_apps, NULL );
}
CONNPEER '=' QSTRING peerinfo ';'
{
......
......@@ -41,6 +41,8 @@
/* forward declarations */
static void * sig_hdl(void * arg);
static int main_cmdline(int argc, char *argv[]);
static void main_version(void);
static void main_help( void );
/* The static configuration structure */
static struct fd_config conf;
......@@ -76,10 +78,9 @@ int main(int argc, char * argv[])
CHECK_FCT( fd_dict_base_protocol(fd_g_config->cnf_dict) );
/* Initialize other modules */
CHECK_FCT( fd_ext_init() );
CHECK_FCT( fd_queues_init() );
CHECK_FCT( fd_msg_init() );
CHECK_FCT( fd_peer_init() );
CHECK_FCT( fd_p_expi_init() );
/* Parse the configuration file */
CHECK_FCT( fd_conf_parse() );
......@@ -132,58 +133,33 @@ end:
TRACE_DEBUG(INFO, FD_PROJECT_BINARY " daemon is stopping...");
/* cleanups */
CHECK_FCT_DO( fd_ext_fini(), /* continue */ );
TODO("Stop dispatch thread(s) properly (no cancel yet)");
CHECK_FCT_DO( fd_peer_fini(), /* Stop all connections */ );
TODO("Stop dispatch & routing threads");
CHECK_FCT_DO( fd_ext_fini(), /* Cleaup all extensions */ );
TODO("Cleanup queues (dump all remaining messages ?)");
CHECK_FCT_DO( fd_thr_term(&sig_th), /* continue */ );
return ret;
}
/* Display package version */
static void main_version_core(void)
{
printf("%s, version %d.%d.%d"
#ifdef HG_VERSION
" (r%s"
# ifdef PACKAGE_HG_REVISION
"/%s"
# endif /* PACKAGE_HG_VERSION */
")"
#endif /* HG_VERSION */
"\n",
FD_PROJECT_NAME, FD_PROJECT_VERSION_MAJOR, FD_PROJECT_VERSION_MINOR, FD_PROJECT_VERSION_REV
#ifdef HG_VERSION
, HG_VERSION
# ifdef PACKAGE_HG_REVISION
, PACKAGE_HG_REVISION
# endif /* PACKAGE_HG_VERSION */
#endif /* HG_VERSION */
);
}
/* Display package version and general info */
static void main_version(void)
const char * fd_ev_str(int event)
{
main_version_core();
printf( "%s\n", FD_PROJECT_COPYRIGHT);
printf( "\nSee " FD_PROJECT_NAME " homepage at http://aaa.koganei.wide.ad.jp/\n"
" for information, updates and bug reports on this software.\n");
}
/* Print command-line options */
static void main_help( void )
{
main_version_core();
printf( " This daemon is an implementation of the Diameter protocol\n"
" used for Authentication, Authorization, and Accounting (AAA).\n");
printf("\nUsage: " FD_PROJECT_BINARY " [OPTIONS]...\n");
printf( " -h, --help Print help and exit\n"
" -V, --version Print version and exit\n"
" -c, --config=filename Read configuration from this file instead of the \n"
" default location (%s).\n", DEFAULT_CONF_FILE);
printf( "\nDebug:\n"
" These options are mostly useful for developers\n"
" -d, --debug Increase verbosity of debug messages\n"
" -q, --quiet Decrease verbosity then remove debug messages\n");
switch (event) {
#define case_str( _val )\
case _val : return #_val
case_str(FDEV_TERMINATE);
case_str(FDEV_DUMP_DICT);
case_str(FDEV_DUMP_EXT);
case_str(FDEV_DUMP_QUEUES);
case_str(FDEV_DUMP_CONFIG);
case_str(FDEV_DUMP_PEERS);
default:
TRACE_DEBUG(FULL, "Unknown event : %d", event);
return "Unknown event";
}
}
/* Parse the command-line */
......@@ -244,7 +220,54 @@ static int main_cmdline(int argc, char *argv[])
}
return 0;
}
/* Display package version */
static void main_version_core(void)
{
printf("%s, version %d.%d.%d"
#ifdef HG_VERSION
" (r%s"
# ifdef PACKAGE_HG_REVISION
"/%s"
# endif /* PACKAGE_HG_VERSION */
")"
#endif /* HG_VERSION */
"\n",
FD_PROJECT_NAME, FD_PROJECT_VERSION_MAJOR, FD_PROJECT_VERSION_MINOR, FD_PROJECT_VERSION_REV
#ifdef HG_VERSION
, HG_VERSION
# ifdef PACKAGE_HG_REVISION
, PACKAGE_HG_REVISION
# endif /* PACKAGE_HG_VERSION */
#endif /* HG_VERSION */
);
}
/* Display package version and general info */
static void main_version(void)
{
main_version_core();
printf( "%s\n", FD_PROJECT_COPYRIGHT);
printf( "\nSee " FD_PROJECT_NAME " homepage at http://aaa.koganei.wide.ad.jp/\n"
" for information, updates and bug reports on this software.\n");
}
/* Print command-line options */
static void main_help( void )
{
main_version_core();
printf( " This daemon is an implementation of the Diameter protocol\n"
" used for Authentication, Authorization, and Accounting (AAA).\n");
printf("\nUsage: " FD_PROJECT_BINARY " [OPTIONS]...\n");
printf( " -h, --help Print help and exit\n"
" -V, --version Print version and exit\n"
" -c, --config=filename Read configuration from this file instead of the \n"
" default location (%s).\n", DEFAULT_CONF_FILE);
printf( "\nDebug:\n"
" These options are mostly useful for developers\n"
" -d, --debug Increase verbosity of debug messages\n"
" -q, --quiet Decrease verbosity then remove debug messages\n");
}
#ifdef HAVE_SIGNALENT_H
......
......@@ -35,44 +35,120 @@
#include "fD.h"
static pthread_t exp_thr;
static struct fd_list exp_list = FD_LIST_INITIALIZER( exp_list );
static pthread_cond_t exp_cnd = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t exp_mtx = PTHREAD_MUTEX_INITIALIZER;
static void * exp_th_fct(void * arg)
{
fd_log_threadname ( "Peers/expire" );
TRACE_ENTRY( "" );
CHECK_POSIX_DO( pthread_mutex_lock(&exp_mtx), goto error );
pthread_cleanup_push( fd_cleanup_mutex, &exp_mtx );
do {
struct timespec now;
struct fd_peer * first;
/* Check if there are expiring sessions available */
if (FD_IS_LIST_EMPTY(&exp_list)) {
/* Just wait for a change or cancelation */
CHECK_POSIX_DO( pthread_cond_wait( &exp_cnd, &exp_mtx ), goto error );
/* Restart the loop on wakeup */
continue;
}
/* Get the pointer to the peer that expires first */
first = (struct fd_peer *)(exp_list.next->o);
ASSERT( CHECK_PEER(first) );
/* Get the current time */
CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto error );
/* If first peer is not expired, we just wait until it happens */
if ( TS_IS_INFERIOR( &now, &first->p_exp_timer ) ) {
CHECK_POSIX_DO2( pthread_cond_timedwait( &exp_cnd, &exp_mtx, &first->p_exp_timer ),
ETIMEDOUT, /* ETIMEDOUT is a normal error, continue */,
/* on other error, */ goto error );
/* on wakeup, loop */
continue;
}
/* Now, the first peer in the list is expired; signal it */
fd_list_unlink( &first->p_expiry );
CHECK_FCT_DO( fd_event_send(first->p_events, FDEVP_TERMINATE, NULL), goto error );
} while (1);
pthread_cleanup_pop( 1 );
error:
TRACE_DEBUG(INFO, "An error occurred in peers module! Expiry thread is terminating...");
ASSERT(0);
CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, NULL), );
return NULL;
}
/* Initialize peers expiry mechanism */
int fd_p_expi_init(void)
{
TODO("");
return ENOTSUP;
TRACE_ENTRY();
CHECK_FCT( pthread_create( &exp_thr, NULL, exp_th_fct, NULL ) );
return 0;
}
/* Finish peers expiry mechanism */
int fd_p_expi_fini(void)
{
TODO("");
return ENOTSUP;
CHECK_FCT_DO( fd_thr_term(&exp_thr), );
CHECK_POSIX( pthread_mutex_lock(&exp_mtx) );
while (!FD_IS_LIST_EMPTY(&exp_list)) {
struct fd_peer * peer = (struct fd_peer *)(exp_list.next->o);
fd_list_unlink(&peer->p_expiry );
}
CHECK_POSIX( pthread_mutex_unlock(&exp_mtx) );
return 0;
}
/* Add a peer in the expiry list if needed */
int fd_p_expi_update(struct fd_peer * peer, int locked )
/* Add / requeue a peer in the expiry list */
int fd_p_expi_update(struct fd_peer * peer )
{
TODO("");
TRACE_ENTRY("%p", peer);
CHECK_PARAMS( CHECK_PEER(peer) );
CHECK_POSIX( pthread_mutex_lock(&exp_mtx) );
fd_list_unlink(&peer->p_expiry );
/* if peer expires */
/* add to the expiry list in appropriate position */
/* increment peer refcount */
if (peer->p_hdr.info.pi_flags.exp) {
struct fd_list * li;
/* update the p_exp_timer value */
CHECK_SYS( clock_gettime(CLOCK_REALTIME, &peer->p_exp_timer) );
peer->p_exp_timer.tv_sec += peer->p_hdr.info.pi_lft;
/* add to the expiry list in appropriate position (probably around the end) */
for (li = exp_list.prev; li != &exp_list; li = li->prev) {
struct fd_peer * p = (struct fd_peer *)(li->o);
if (TS_IS_INFERIOR( &p->p_exp_timer, &peer->p_exp_timer ) )
break;
}
fd_list_insert_after(li, &peer->p_expiry);
/* signal the expiry thread if we added in first position */
if (li == &exp_list) {
CHECK_POSIX( pthread_cond_signal(&exp_cnd) );
}
}
return ENOTSUP;
CHECK_POSIX( pthread_mutex_unlock(&exp_mtx) );
return 0;
}
/* Remove a peer from expiry list if needed */
int fd_p_expi_unlink(struct fd_peer * peer, int locked )
{
TODO("");
/* if peer is in expiry list */
/* remove from the list */
/* decrement peer refcount */
/* no need to signal the expiry thread ... */
return ENOTSUP;
}
......@@ -35,6 +35,35 @@
#include "fD.h"
const char *peer_state_str[] = {
"STATE_ZOMBIE"
, "STATE_OPEN"
, "STATE_CLOSED"
, "STATE_CLOSING"
, "STATE_WAITCNXACK"
, "STATE_WAITCNXACK_ELEC"
, "STATE_WAITCEA"
, "STATE_SUSPECT"
, "STATE_REOPEN"
};
const char * fd_pev_str(int event)
{
switch (event) {
#define case_str( _val )\
case _val : return #_val
case_str(FDEVP_TERMINATE);
case_str(FDEVP_DUMP_ALL);
case_str(FDEVP_MSG_INCOMING);
case_str(FDEVP_PSM_TIMEOUT);
default:
TRACE_DEBUG(FULL, "Unknown event : %d", event);
return "Unknown event";
}
}
static int started = 0;
static pthread_mutex_t started_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t started_cnd = PTHREAD_COND_INITIALIZER;
......@@ -55,17 +84,122 @@ awake:
return 0;
}
/* Allow the state machines to start */
int fd_psm_start()
/* Cancelation cleanup : set ZOMBIE state in the peer */
void cleanup_state(void * arg)
{
TRACE_ENTRY("");
CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
started = 1;
CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
return 0;
struct fd_peer * peer = (struct fd_peer *)arg;
CHECK_PARAMS_DO( CHECK_PEER(peer), return );
peer->p_hdr.info.pi_state = STATE_ZOMBIE;
return;
}
/* Set timeout timer of next event */
static void psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
{
/* Initialize the timer */
CHECK_POSIX_DO( clock_gettime( CLOCK_REALTIME, &peer->p_psm_timer ), ASSERT(0) );
if (add_random) {
if (delay > 2)
delay -= 2;
else
delay = 0;
/* Add a random value between 0 and 4sec */
peer->p_psm_timer.tv_sec += random() % 4;
peer->p_psm_timer.tv_nsec+= random() % 1000000000L;
if (peer->p_psm_timer.tv_nsec > 1000000000L) {
peer->p_psm_timer.tv_nsec -= 1000000000L;
peer->p_psm_timer.tv_sec ++;
}
}
peer->p_psm_timer.tv_sec += delay;
#if 0
/* temporary for debug */
peer->p_psm_timer.tv_sec += 10;
#endif
}
static int psm_ev_timedget(struct fd_peer * peer, int *code, void ** data)
{
struct fd_event * ev;
int ret = 0;
TRACE_ENTRY("%p %p %p", peer, code, data);
ret = fd_fifo_timedget(peer->p_events, &ev, &peer->p_psm_timer);
if (ret == ETIMEDOUT) {
*code = FDEVP_PSM_TIMEOUT;
*data = NULL;
} else {
CHECK_FCT( ret );
*code = ev->code;
*data = ev->data;
free(ev);
}
return 0;
}
/* The state machine thread */
static void * p_psm_th( void * arg )
{
struct fd_peer * peer = (struct fd_peer *)arg;
int created_started = started;
CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
pthread_cleanup_push( cleanup_state, arg );
/* Set the thread name */
{
char buf[48];
sprintf(buf, "PSM/%.*s", sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
fd_log_threadname ( buf );
}
/* Wait that the PSM are authorized to start in the daemon */
CHECK_FCT_DO( fd_psm_waitstart(), goto end );
/* The state machine starts in CLOSED state */
peer->p_hdr.info.pi_state = STATE_CLOSED;
/* Initialize the timer */
if (peer->p_flags.pf_responder) {
psm_next_timeout(peer, 0, INCNX_TIMEOUT);
} else {
psm_next_timeout(peer, created_started ? 0 : 1, 0);
}
psm:
do {
int event;
void * ev_data;
/* Get next event */
CHECK_FCT_DO( psm_ev_timedget(peer, &event, &ev_data), goto end );
TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p)\t'%s'",
STATE_STR(peer->p_hdr.info.pi_state),
fd_pev_str(event), ev_data,
peer->p_hdr.info.pi_diamid);
/* Now, the action depends on the current state and the incoming event */
} while (1);
end:
/* set STATE_ZOMBIE */
pthread_cleanup_pop(1);
return NULL;
}
/* Create the PSM thread of one peer structure */
int fd_psm_begin(struct fd_peer * peer )
{
......@@ -78,15 +212,30 @@ int fd_psm_begin(struct fd_peer * peer )
int fd_psm_terminate(struct fd_peer * peer )
{
TRACE_ENTRY("%p", peer);
TODO("");
return ENOTSUP;
CHECK_PARAMS( CHECK_PEER(peer) );
CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, NULL) );
return 0;
}
/* End the PSM violently */
void fd_psm_abord(struct fd_peer * peer )
{
TRACE_ENTRY("%p", peer);
TODO("");
TODO("Cancel PSM thread");
TODO("Cancel IN thread");
TODO("Cancel OUT thread");
TODO("Cleanup the connection");
return;
}
/* Allow the state machines to start */
int fd_psm_start()
{
TRACE_ENTRY("");
CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
started = 1;
CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
return 0;