Commit 734ae79d authored by Sebastien Decugis's avatar Sebastien Decugis
Browse files

Added the dispatch thread code

parent f558d5d2
......@@ -66,7 +66,185 @@ int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor,
}
/* Note: if the message is for local delivery, we should test for duplicate
(draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
/* The dispatching thread(s) */
enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 };
static void cleanup_state(void * state_loc)
{
if (state_loc)
*(enum thread_state *)state_loc = TERMINATED;
}
static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER;
static enum { RUN = 0, STOP = 1 } order_val;
static void * dispatch_thread(void * arg)
{
TRACE_ENTRY("%p", arg);
/* Set the thread name */
{
char buf[48];
snprintf(buf, sizeof(buf), "Dispatch %p", arg);
fd_log_threadname ( buf );
}
pthread_cleanup_push( cleanup_state, arg );
/* Mark the thread running */
*(enum thread_state *)arg = RUNNING;
do {
struct msg * msg;
struct msg_hdr * hdr;
int is_req = 0;
struct session * sess;
enum disp_action action;
const char * ec = NULL;
const char * em = NULL;
/* Test the environment */
{
int must_stop;
CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
must_stop = (order_val == STOP);
CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
if (must_stop)
goto end;
pthread_testcancel();
}
/* Ok, we are allowed to run */
/* Get the next message from the queue */
CHECK_FCT_DO( fd_fifo_get ( fd_g_local, &msg ), goto fatal_error );
if (TRACE_BOOL(FULL)) {
TRACE_DEBUG(FULL, "Picked next message:");
fd_msg_dump_one(FULL, msg);
}
/* Read the message header */
CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
/* Note: if the message is for local delivery, we should test for duplicate
(draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
/* At this point, we probably need to understand the message content, so parse the message */
CHECK_FCT_DO( fd_msg_parse_dict ( msg, fd_g_config->cnf_dict ), /* Ignore error */);
/* First, if the original request was registered with a callback and we receive the answer, call it. */
if ( ! is_req ) {
struct msg * qry;
void (*anscb)(void *, struct msg **) = NULL;
void * data = NULL;
/* Retrieve the corresponding query */
CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error );
/* Retrieve any registered handler */
CHECK_FCT_DO( fd_msg_anscb_get( qry, &anscb, &data ), goto fatal_error );
/* If a callback was registered, pass the message to it */
if (anscb != NULL) {
TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
(*anscb)(data, &msg);
if (msg == NULL) {
/* Ok, the message is now handled, we can skip to the next one */
continue;
}
}
}
/* Retrieve the session of the message */
CHECK_FCT_DO( fd_msg_sess_get(fd_g_config->cnf_dict, msg, &sess, NULL), goto fatal_error );
/* Now, call any callback registered for the message */
CHECK_FCT_DO( fd_msg_dispatch ( &msg, sess, &action, &ec), goto fatal_error );
/* Now, act depending on msg and action and ec */
if (!msg)
continue;
switch ( action ) {
case DISP_ACT_CONT:
/* No callback has handled the message, let's reply with a generic error */
em = "The message was not handled by any extension callback";
ec = "DIAMETER_COMMAND_UNSUPPORTED";
case DISP_ACT_ERROR:
/* We have a problem with delivering the message */
if (ec == NULL) {
ec = "DIAMETER_UNABLE_TO_COMPLY";
}
if (!is_req) {
TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!");
fd_msg_dump_walk(INFO, msg);
fd_msg_free(msg);
msg = NULL;
break;
}
/* Create an answer with the error code and message */
CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ), goto fatal_error );
CHECK_FCT_DO( fd_msg_rescode_set(msg, (char *)ec, (char *)em, NULL, 1 ), goto fatal_error );
case DISP_ACT_SEND:
/* Now, send the message */
CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error );
}
/* We're done with this message */
} while (1);
fatal_error:
TRACE_DEBUG(INFO, "An error occurred in dispatch module! Thread is terminating...");
CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
end:
/* Mark the thread as terminated */
pthread_cleanup_pop(1);
return NULL;
}
/* Later (same as routing): TODO("Set threshold on local queue"); */
static pthread_t my_dispatch = (pthread_t)NULL;
static enum thread_state my_state = INITIAL;
/* Initialize the Dispatch module */
int fd_disp_init(void)
{
order_val = RUN;
CHECK_POSIX( pthread_create( &my_dispatch, NULL, dispatch_thread, &my_state ) );
return 0;
}
int fd_disp_cleanstop(void)
{
CHECK_POSIX( pthread_mutex_lock(&order_lock) );
order_val = STOP;
CHECK_POSIX( pthread_mutex_unlock(&order_lock) );
return 0;
}
int fd_disp_fini(void)
{
/* Wait for a few milliseconds for the thread to complete, by monitoring my_state */
/* Then if needed, cancel the thread */
/* Remove all remaining handlers */
fd_disp_unregister_all();
return ENOTSUP;
}
......@@ -99,6 +99,7 @@ int main(int argc, char * argv[])
CHECK_FCT( fd_queues_init() );
CHECK_FCT( fd_msg_init() );
CHECK_FCT( fd_p_expi_init() );
CHECK_FCT( fd_disp_init() );
CHECK_FCT( fd_rt_init() );
/* Parse the configuration file */
......@@ -161,9 +162,11 @@ end:
/* cleanups */
CHECK_FCT_DO( fd_servers_stop(), /* Stop accepting new connections */ );
TODO("Stop dispatch thread(s) properly (no cancel yet)");
CHECK_FCT_DO( fd_disp_cleanstop(), /* Stop dispatch thread(s) properly (no cancel yet) */ );
CHECK_FCT_DO( fd_peer_fini(), /* Stop all connections */ );
TODO("Stop dispatch & routing threads");
CHECK_FCT_DO( fd_rt_fini(), /* Stop routing threads */ );
CHECK_FCT_DO( fd_disp_fini(), /* Stop dispatch thread */ );
CHECK_FCT_DO( fd_ext_fini(), /* Cleaup all extensions */ );
TODO("Cleanup queues (dump all remaining messages ?)");
......
......@@ -862,6 +862,14 @@ int fd_rt_fini(void)
{
CHECK_FCT_DO( fd_thr_term(&rt_in ), /* continue */);
CHECK_FCT_DO( fd_thr_term(&rt_out), /* continue */);
/* Cleanup all remaining handlers */
while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
}
while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
}
return 0;
}
......
......@@ -105,6 +105,7 @@ int main(int argc, char *argv[])
enum disp_action action;
struct disp_hdl * hdl[NB_CB];
struct disp_when when;
const char * ec;
/* First, initialize the daemon modules */
INIT_FD();
......@@ -149,7 +150,7 @@ int main(int argc, char *argv[])
/* Check this handler is called for a message */
msg = new_msg( 0, cmd1, avp1, NULL, 0 );
memset(cbcalled, 0, sizeof(cbcalled));
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( DISP_ACT_CONT, action );
......@@ -173,7 +174,7 @@ int main(int argc, char *argv[])
/* Check the callbacks are called as appropriate */
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 0, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -184,7 +185,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -195,7 +196,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -229,7 +230,7 @@ int main(int argc, char *argv[])
/* Check the callbacks are called as appropriate */
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 0, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -239,7 +240,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -249,7 +250,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -259,7 +260,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd2, NULL, avp2, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -309,7 +310,7 @@ int main(int argc, char *argv[])
/* Check the callbacks are called as appropriate */
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 0, cmd1, NULL, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -321,7 +322,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 0, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -333,7 +334,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd2, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -345,7 +346,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -357,7 +358,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, avp2, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -370,7 +371,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, NULL, avp2, 1 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -383,7 +384,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, NULL, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -430,7 +431,7 @@ int main(int argc, char *argv[])
/* Check the callbacks are called as appropriate */
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 0, cmd1, avp1, NULL, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -440,7 +441,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd2, avp1, avp2, 0 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -450,7 +451,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd2, avp1, avp2, 1 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -460,7 +461,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -470,7 +471,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, avp2, 1 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -488,7 +489,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_msg_avp_setvalue ( avp, &value ) );
CHECK( 0, fd_msg_avp_add ( msg, MSG_BRW_LAST_CHILD, avp ) );
}
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -512,7 +513,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, avp2, 1 );
CHECK( 12345, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 12345, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[6] );
......@@ -534,7 +535,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, avp2, 1 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[8] );
......@@ -556,7 +557,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 1, cmd1, avp1, avp2, 1 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[9] );
......@@ -587,7 +588,7 @@ int main(int argc, char *argv[])
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -599,7 +600,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_disp_register( cb_9, DISP_HOW_ANY, &when, &hdl[5] ) );
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 0, cbcalled[1] );
CHECK( 0, cbcalled[2] );
......@@ -612,7 +613,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_disp_register( cb_9, DISP_HOW_AVP_ENUMVAL, &when, &hdl[5] ) );
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -625,7 +626,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_disp_register( cb_9, DISP_HOW_AVP, &when, &hdl[5] ) );
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -638,7 +639,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_disp_register( cb_9, DISP_HOW_CC, &when, &hdl[5] ) );
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......@@ -651,7 +652,7 @@ int main(int argc, char *argv[])
CHECK( 0, fd_disp_register( cb_9, DISP_HOW_APPID, &when, &hdl[5] ) );
memset(cbcalled, 0, sizeof(cbcalled));
msg = new_msg( 2, cmd2, avp1, avp2, 2 );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
CHECK( 1, cbcalled[0] );
CHECK( 1, cbcalled[1] );
CHECK( 1, cbcalled[2] );
......
......@@ -2352,7 +2352,8 @@ struct disp_when {
enum disp_action {
DISP_ACT_CONT, /* The next handler should be called, unless *msg == NULL. */
DISP_ACT_SEND /* The updated message must be sent. No further callback is called. */
DISP_ACT_SEND, /* The updated message must be sent. No further callback is called. */
DISP_ACT_ERROR /* An error must be created and sent as a reply -- not valid for callbacks, only for fd_msg_dispatch. */
};
/* The callbacks that are registered have the following prototype:
* int dispatch_callback( struct msg ** msg, struct avp * avp, struct session * session, enum disp_action * action );
......@@ -2423,6 +2424,9 @@ int fd_disp_register ( int (*cb)( struct msg **, struct avp *, struct session *,
*/
int fd_disp_unregister ( struct disp_hdl ** handle );
/* Destroy all handlers */
void fd_disp_unregister_all ( void );
/*
* FUNCTION: fd_msg_dispatch
*
......@@ -2442,7 +2446,7 @@ int fd_disp_unregister ( struct disp_hdl ** handle );
* EINVAL : A parameter is invalid.
* (other errors)
*/
int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action );
int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action, const char ** error_code );
......
......@@ -199,3 +199,12 @@ int fd_disp_unregister ( struct disp_hdl ** handle )
return 0;
}
/* Delete all handlers */
void fd_disp_unregister_all ( void )
{
TRACE_ENTRY("");
while (!FD_IS_LIST_EMPTY(&all_handlers)) {
CHECK_FCT_DO( fd_disp_unregister(all_handlers.next->o), /* continue */ );
}
return;
}
......@@ -2232,7 +2232,7 @@ int fd_msg_update_length ( msg_or_avp * object )
goto no_error;
/* Call all dispatch callbacks for a given message */
int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action)
int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action, const char ** error_code)
{
struct dictionary * dict;
struct dict_object * app;
......@@ -2241,9 +2241,12 @@ int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_act
struct fd_list * cb_list;
int ret = 0;
TRACE_ENTRY("%p %p %p", msg, session, action);
TRACE_ENTRY("%p %p %p %p", msg, session, action, error_code);
CHECK_PARAMS( msg && CHECK_MSG(*msg) && action);
if (error_code)
*error_code = NULL;
/* Take the dispatch lock */
CHECK_FCT( pthread_rwlock_rdlock(&fd_disp_lock) );
pthread_cleanup_push( fd_cleanup_rwlock, &fd_disp_lock );
......@@ -2261,8 +2264,17 @@ int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_act
CHECK_FCT_DO( ret = fd_dict_search( dict, DICT_APPLICATION, APPLICATION_BY_ID, &(*msg)->msg_public.msg_appl, &app, 0 ), goto error );
if (app == NULL) {
/* In that case, maybe we should answer a DIAMETER_APPLICATION_UNSUPPORTED error ? Do we do this here ? */
TODO("Reply DIAMETER_APPLICATION_UNSUPPORTED if it's a request ?");
if ((*msg)->msg_public.msg_flags & CMD_FLAG_REQUEST) {
if (error_code)
*error_code = "DIAMETER_APPLICATION_UNSUPPORTED";
*action = DISP_ACT_ERROR;
} else {
TRACE_DEBUG(INFO, "Received an answer to a local query with an unsupported application %d, discarding...", (*msg)->msg_public.msg_appl);
fd_msg_dump_walk(INFO, *msg);
fd_msg_free(*msg);
*msg = NULL;
}
goto no_error;
}
/* So start browsing the message */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment