Commit b45af4a3 authored by winckel's avatar winckel

Moved files related to assertions, backtrace and signals handling into ITTI directory.

Renamed all ITTI functions, to make them start with module name ("itti_").
Added generic signals handling support in ITTI.
Changed ITTI log port for oaisim to 10006.
Introduce an option to use ITTI in oaisim and reoragized code.

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4267 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 228aa778
......@@ -2,6 +2,8 @@ ITTI_DIR = $(COMMON_UTILS_DIR)/itti
ITTI_OBJS = $(ITTI_DIR)/intertask_interface.o
ITTI_OBJS += $(ITTI_DIR)/intertask_interface_dump.o
ITTI_OBJS += $(ITTI_DIR)/backtrace.o
ITTI_OBJS += $(ITTI_DIR)/signals.o
ITTI_OBJS += $(ITTI_DIR)/timer.o
UTILS_OBJS = $(ITTI_OBJS)
......
......@@ -17,4 +17,7 @@ libitti_la_SOURCES = \
udp_message_def.h udp_messages_types.h \
intertask_interface.c intertask_interface.h \
intertask_interface_dump.c intertask_interface_dump.h \
assertions.h \
backtrace.c backtrace.h \
signals.c signals.h \
timer.c timer.h
\ No newline at end of file
/*******************************************************************************
Eurecom OpenAirInterface
Copyright(c) 1999 - 2012 Eurecom
Eurecom OpenAirInterface
Copyright(c) 1999 - 2012 Eurecom
This program is free software; you can redistribute it and/or modify it
under the terms and conditions of the GNU General Public License,
version 2, as published by the Free Software Foundation.
This program is free software; you can redistribute it and/or modify it
under the terms and conditions of the GNU General Public License,
version 2, as published by the Free Software Foundation.
This program is distributed in the hope it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
more details.
This program is distributed in the hope it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
The full GNU General Public License is included in this distribution in
the file called "COPYING".
The full GNU General Public License is included in this distribution in
the file called "COPYING".
Contact Information
Openair Admin: openair_admin@eurecom.fr
Openair Tech : openair_tech@eurecom.fr
Forums : http://forums.eurecom.fr/openairinterface
Address : EURECOM, Campus SophiaTech, 450 Route des Chappes
06410 Biot FRANCE
Contact Information
Openair Admin: openair_admin@eurecom.fr
Openair Tech : openair_tech@eurecom.fr
Forums : http://forums.eurecom.fr/openairinterface
Address : EURECOM, Campus SophiaTech, 450 Route des Chappes
06410 Biot FRANCE
*******************************************************************************/
*******************************************************************************/
#include <pthread.h>
#include <stdio.h>
......@@ -34,6 +34,7 @@
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include "queue.h"
#include "assertions.h"
......@@ -45,6 +46,7 @@
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY
#include "signals.h"
#include "timer.h"
int itti_debug = 1;
......@@ -58,125 +60,124 @@ int itti_debug = 1;
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)
typedef enum task_state_s {
TASK_STATE_NOT_CONFIGURED,
TASK_STATE_STARTING,
TASK_STATE_READY,
TASK_STATE_MAX,
TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
} task_state_t;
/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
struct message_list_s {
STAILQ_ENTRY(message_list_s) next_element;
MessageDef *msg; ///< Pointer to the message
MessageDef *msg; ///< Pointer to the message
message_number_t message_number; ///< Unique message number
uint32_t message_priority; ///< Message priority
message_number_t message_number; ///< Unique message number
uint32_t message_priority; ///< Message priority
};
typedef struct task_desc_s {
/* Queue of messages belonging to the task */
STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
STAILQ_HEAD(message_queue_head, message_list_s)
message_queue;
/* Number of messages in the queue */
volatile uint32_t message_in_queue;
volatile uint32_t message_in_queue;
/* Mutex for the message queue */
pthread_mutex_t message_queue_mutex;
pthread_mutex_t message_queue_mutex;
/* Conditional var for message queue and task synchro */
pthread_cond_t message_queue_cond_var;
pthread_t task_thread;
pthread_cond_t message_queue_cond_var;
pthread_t task_thread;
volatile task_state_t task_state;
} task_desc_t;
struct itti_desc_s {
task_desc_t *tasks;
/* Current message number. Incremented every call to send_msg_to_task */
message_number_t message_number __attribute__((aligned(8)));
message_number_t message_number __attribute__((aligned(8)));
thread_id_t thread_max;
MessagesIds messages_id_max;
pthread_t thread_handling_signals;
const char * const *threads_name;
const message_info_t *messages_info;
};
static struct itti_desc_s itti_desc;
static inline
message_number_t increment_message_number(void)
{
static inline message_number_t itti_increment_message_number(void) {
/* Atomic operation supported by GCC: returns the current message number
* and then increment it by 1.
* This can be done without mutex.
*/
return __sync_fetch_and_add(&itti_desc.message_number, 1);
return __sync_fetch_and_add (&itti_desc.message_number, 1);
}
static inline uint32_t get_message_priority(MessagesIds message_id)
{
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
return (itti_desc.messages_info[message_id].priority);
}
char *get_message_name(MessagesIds message_id)
{
char *itti_get_message_name(MessagesIds message_id) {
DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
return (itti_desc.messages_info[message_id].name);
}
int send_broadcast_message(MessageDef *message_p)
{
int itti_send_broadcast_message(MessageDef *message_p) {
thread_id_t origin_thread_id;
uint32_t i;
int ret = 0;
int temp;
int result;
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++)
{
MessageDef *new_message_p;
if (message_p == NULL) {
ITTI_ERROR("Message to broadcast is NULL (%s:%d)\n", __FILE__, __LINE__);
return -1;
}
/* Skip tasks which are not running */
if (itti_desc.tasks[i].task_state == TASK_STATE_READY)
{
new_message_p = malloc(sizeof(MessageDef));
origin_thread_id = TASK_GET_THREAD_ID(message_p->header.originTaskId);
if (new_message_p == NULL) {
ITTI_ERROR("Failed to allocate memory (%s:%d)\n",
__FILE__, __LINE__);
return -1;
}
memcpy(new_message_p, message_p, sizeof(MessageDef));
temp = send_msg_to_task(TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p);
if (temp < 0) {
ITTI_ERROR("Failed to send broadcast message (%s) to queue (%u:%s)\n",
itti_desc.messages_info[message_p->header.messageId].name, i, itti_desc.threads_name[i]);
ret = temp;
free(new_message_p);
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
MessageDef *new_message_p;
/* Skip task that broadcast the message */
if (i != origin_thread_id) {
/* Skip tasks which are not running */
if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
new_message_p = malloc (sizeof(MessageDef));
if (new_message_p == NULL) {
ITTI_ERROR("Failed to allocate memory (%s:%d)\n", __FILE__, __LINE__);
return -1;
}
memcpy (new_message_p, message_p, sizeof(MessageDef));
result = itti_send_msg_to_task (TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p);
if (result < 0) {
ITTI_ERROR("Failed to send broadcast message (%s) to queue (%u:%s)\n",
itti_desc.messages_info[message_p->header.messageId].name, i, itti_desc.threads_name[i]);
ret = result;
free (new_message_p);
}
}
}
}
free (message_p);
return ret;
}
inline MessageDef *alloc_new_message(
task_id_t origin_task_id,
MessagesIds message_id)
{
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id) {
MessageDef *temp = NULL;
if ((TASK_GET_THREAD_ID(origin_task_id) >= itti_desc.thread_max) ||
(message_id >= itti_desc.messages_id_max))
{
if (message_id >= itti_desc.messages_id_max) {
ITTI_ERROR("Invalid message id %d (%s:%d)\n", message_id, __FILE__, __LINE__);
return NULL;
}
temp = calloc(1, MESSAGE_SIZE(message_id));
temp = calloc (1, MESSAGE_SIZE(message_id));
if (temp == NULL) {
ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n",
__FILE__, __LINE__);
ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__);
return NULL;
}
......@@ -187,16 +188,14 @@ inline MessageDef *alloc_new_message(
return temp;
}
int send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
struct message_list_s *new;
uint32_t priority;
message_number_t message_number;
uint32_t message_id;
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message) {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
struct message_list_s *new;
uint32_t priority;
message_number_t message_number;
uint32_t message_id;
if (thread_id >= itti_desc.thread_max)
{
if (thread_id >= itti_desc.thread_max) {
return -1;
}
......@@ -208,43 +207,41 @@ int send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message
DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);
priority = get_message_priority(message_id);
priority = itti_get_message_priority (message_id);
/* Lock the mutex to get exclusive access to the list */
pthread_mutex_lock(&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
/* We cannot send a message if the task is not running */
DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY,
itti_desc.tasks[thread_id].task_state, TASK_STATE_READY, thread_id);
DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY, itti_desc.tasks[thread_id].task_state,
TASK_STATE_READY, thread_id);
/* Check the number of messages in the queue */
DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef))
< ITTI_QUEUE_SIZE_PER_TASK,
(itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)),
ITTI_QUEUE_SIZE_PER_TASK,
DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK,
(itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK,
itti_desc.tasks[thread_id].message_in_queue);
/* Allocate new list element */
if ((new = (struct message_list_s *)malloc(sizeof(struct message_list_s))) == NULL)
{
ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n",
__FILE__, __LINE__);
if ((new = (struct message_list_s *) malloc (sizeof(struct message_list_s))) == NULL) {
ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__);
return -1;
}
/* Increment the global message number */
message_number = increment_message_number();
message_number = itti_increment_message_number ();
/* Fill in members */
new->msg = message;
new->message_number = message_number;
new->msg = message;
new->message_number = message_number;
new->message_priority = priority;
itti_dump_queue_message(message_number, message, itti_desc.messages_info[message_id].name, MESSAGE_SIZE(message_id));
itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
MESSAGE_SIZE(message_id));
if (STAILQ_EMPTY(&itti_desc.tasks[thread_id].message_queue)) {
STAILQ_INSERT_HEAD(&itti_desc.tasks[thread_id].message_queue, new, next_element);
} else {
if (STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
STAILQ_INSERT_HEAD (&itti_desc.tasks[thread_id].message_queue, new, next_element);
}
else {
// struct message_list_s *insert_after = NULL;
// struct message_list_s *temp;
//
......@@ -263,7 +260,7 @@ int send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message
// }
// }
// if (insert_after == NULL) {
STAILQ_INSERT_TAIL(&itti_desc.tasks[thread_id].message_queue, new, next_element);
STAILQ_INSERT_TAIL (&itti_desc.tasks[thread_id].message_queue, new, next_element);
// } else {
// STAILQ_INSERT_AFTER(&itti_desc.tasks[thread_id].message_queue, insert_after, new,
// next_element);
......@@ -274,84 +271,78 @@ int send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message
itti_desc.tasks[thread_id].message_in_queue++;
if (itti_desc.tasks[thread_id].message_in_queue == 1) {
/* Emit a signal to wake up target task thread */
pthread_cond_signal(&itti_desc.tasks[thread_id].message_queue_cond_var);
pthread_cond_signal (&itti_desc.tasks[thread_id].message_queue_cond_var);
}
/* Release the mutex */
pthread_mutex_unlock(&itti_desc.tasks[thread_id].message_queue_mutex);
ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name, message_number,
priority, thread_id, itti_desc.threads_name[thread_id]);
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
ITTI_DEBUG(
"Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name, message_number, priority, thread_id, itti_desc.threads_name[thread_id]);
return 0;
}
void receive_msg(task_id_t task_id, MessageDef **received_msg)
{
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
DevAssert(received_msg != NULL);
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock(&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
if (itti_desc.tasks[thread_id].message_in_queue == 0) {
ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n",
thread_id, itti_desc.threads_name[thread_id]);
ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", thread_id, itti_desc.threads_name[thread_id]);
// Wait while list == 0
pthread_cond_wait(&itti_desc.tasks[thread_id].message_queue_cond_var,
&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_cond_wait (&itti_desc.tasks[thread_id].message_queue_cond_var,
&itti_desc.tasks[thread_id].message_queue_mutex);
ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification for task %x\n",
thread_id, itti_desc.threads_name[thread_id], task_id);
}
if (!STAILQ_EMPTY(&itti_desc.tasks[thread_id].message_queue)) {
struct message_list_s *temp = STAILQ_FIRST(
&itti_desc.tasks[thread_id].message_queue);
if (!STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
struct message_list_s *temp = STAILQ_FIRST (&itti_desc.tasks[thread_id].message_queue);
/* Update received_msg reference */
*received_msg = temp->msg;
/* Remove message from queue */
STAILQ_REMOVE_HEAD(&itti_desc.tasks[thread_id].message_queue, next_element);
free(temp);
STAILQ_REMOVE_HEAD (&itti_desc.tasks[thread_id].message_queue, next_element);
free (temp);
itti_desc.tasks[thread_id].message_in_queue--;
}
// Release the mutex
pthread_mutex_unlock(&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}
void poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg)
{
void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck (thread_id < itti_desc.thread_max, thread_id, 0, 0);
DevAssert (received_msg != NULL);
DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
DevAssert(received_msg != NULL);
*received_msg = NULL;
if (itti_desc.tasks[thread_id].message_in_queue != 0)
{
if (itti_desc.tasks[thread_id].message_in_queue != 0) {
struct message_list_s *temp;
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
STAILQ_FOREACH(temp, &itti_desc.tasks[thread_id].message_queue, next_element)
STAILQ_FOREACH (temp, &itti_desc.tasks[thread_id].message_queue, next_element)
{
if ((temp->msg->header.destinationTaskId == task_id)
&& ((instance == INSTANCE_ALL) || (temp->msg->header.instance == instance)))
{
&& ((instance == INSTANCE_ALL) || (temp->msg->header.instance == instance))) {
/* Update received_msg reference */
*received_msg = temp->msg;
/* Remove message from queue */
STAILQ_REMOVE(&itti_desc.tasks[thread_id].message_queue, temp, message_list_s, next_element);
STAILQ_REMOVE (&itti_desc.tasks[thread_id].message_queue, temp, message_list_s, next_element);
free (temp);
itti_desc.tasks[thread_id].message_in_queue--;
ITTI_DEBUG("Receiver queue[(%u:%s)] got new message %s, number %lu for task %x\n",
thread_id, itti_desc.threads_name[thread_id], itti_desc.messages_info[temp->msg->header.messageId].name,
temp->message_number, task_id);
ITTI_DEBUG(
"Receiver queue[(%u:%s)] got new message %s, number %lu for task %x\n",
thread_id, itti_desc.threads_name[thread_id], itti_desc.messages_info[temp->msg->header.messageId].name, temp->message_number, task_id);
break;
}
}
......@@ -360,16 +351,12 @@ void poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg)
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}
if (*received_msg == NULL)
{
if (*received_msg == NULL) {
ITTI_DEBUG("No message in queue[(%u:%s)] for task %x\n", thread_id, itti_desc.threads_name[thread_id], task_id);
}
}
int intertask_interface_create_task(task_id_t task_id,
void *(*start_routine) (void *),
void *args_p)
{
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevAssert(start_routine != NULL);
......@@ -377,76 +364,107 @@ int intertask_interface_create_task(task_id_t task_id,
if (itti_desc.tasks[thread_id].task_state != TASK_STATE_NOT_CONFIGURED) {
ITTI_ERROR("You are attempting to start an already configured thread"
" for %s thread\n", itti_desc.threads_name[thread_id]);
" for %s thread\n",
itti_desc.threads_name[thread_id]);
return -1;
}
itti_desc.tasks[thread_id].task_state = TASK_STATE_STARTING;
if (pthread_create(&itti_desc.tasks[thread_id].task_thread, NULL, start_routine,
args_p) < 0) {
ITTI_ERROR("Failed to initialize %s thread: %s:%d\n", itti_desc.threads_name[thread_id],
strerror(errno),
errno);
if (pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p) < 0) {
ITTI_ERROR("Failed to initialize %s thread: %s:%d\n",
itti_desc.threads_name[thread_id], strerror(errno), errno);
return -1;
}
/* Wait till the thread is completely ready */
while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY);
while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY)
;
return 0;
}
void intertask_interface_mark_task_ready(task_id_t task_id)
{
void itti_mark_task_ready(task_id_t task_id) {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock(&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
itti_desc.tasks[thread_id].task_state = TASK_STATE_READY;
// Release the mutex
pthread_mutex_unlock(&itti_desc.tasks[thread_id].message_queue_mutex);
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}
void itti_terminate_tasks(task_id_t task_id) {
// Sends Terminate signals to all tasks.
itti_send_terminate_message (task_id);
if (itti_desc.thread_handling_signals >= 0) {
pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
}
pthread_exit (NULL);
}
int intertask_interface_init(thread_id_t thread_max, MessagesIds messages_id_max,
const char * const *threads_name,
const message_info_t *messages_info,
const char * const messages_definition_xml)
{
int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * const *threads_name,
const message_info_t *messages_info, const char * const messages_definition_xml) {
int i;
itti_desc.message_number = 0;
CHECK_INIT_RETURN(signal_init());
/* Saves threads and messages max values */
itti_desc.thread_max = thread_max;
itti_desc.messages_id_max = messages_id_max;
itti_desc.thread_handling_signals = -1;
itti_desc.threads_name = threads_name;
itti_desc.messages_info = messages_info;
/* Allocates memory for tasks info */
itti_desc.tasks = calloc(itti_desc.thread_max, sizeof(task_desc_t));
itti_desc.tasks = calloc (itti_desc.thread_max, sizeof(task_desc_t));
/* Initializing each queue and related stuff */
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
STAILQ_INIT(&itti_desc.tasks[i].message_queue);
STAILQ_INIT (&itti_desc.tasks[i].message_queue);
itti_desc.tasks[i].message_in_queue = 0;
// Initialize mutexes
pthread_mutex_init(&itti_desc.tasks[i].message_queue_mutex, NULL);
pthread_mutex_init (&itti_desc.tasks[i].message_queue_mutex, NULL);
// Initialize Cond vars
pthread_cond_init(&itti_desc.tasks[i].message_queue_cond_var, NULL);
pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL);
itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED;
}
itti_dump_init(messages_definition_xml);
itti_dump_init (messages_definition_xml);
timer_init();
CHECK_INIT_RETURN(timer_init ());
return 0;
}
void intertask_interface_send_quit_signal(void)
{
void itti_wait_tasks_end(void) {
int end = 0;
int i;
itti_desc.thread_handling_signals = pthread_self ();
/* Handle signals here */
while (end == 0) {
signal_handle (&end);
}
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
/* Skip tasks which are not running */
if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
ITTI_DEBUG("Waiting end of thread %s\n", itti_desc.threads_name[i]);
pthread_join (itti_desc.tasks[i].task_thread, NULL);
itti_desc.tasks[i].task_state = TASK_STATE_ENDED;
}
}
}
void itti_send_terminate_message(task_id_t task_id) {
MessageDef *terminate_message_p;
terminate_message_p = alloc_new_message(TASK_UNKNOWN, TERMINATE_MESSAGE);
terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
send_broadcast_message(terminate_message_p);
itti_send_broadcast_message (terminate_message_p);
}
......@@ -81,7 +81,7 @@ enum task_priorities {
\param message_p Pointer to the message to send
@returns < 0 on failure, 0 otherwise
**/
int send_broadcast_message(MessageDef *message_p);
int itti_send_broadcast_message(MessageDef *message_p);
/** \brief Send a message to a task (could be itself)
\param task_id Task ID
......@@ -89,21 +89,21 @@ int send_broadcast_message(MessageDef *message_p);
\param message Pointer to the message to send
@returns -1 on failure, 0 otherwise
**/
int send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message);
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message);
/** \brief Retrieves a message in the queue associated to task_id.
* If the queue is empty, the thread is blocked till a new message arrives.
\param task_id Task ID of the receiving task
\param received_msg Pointer to the allocated message
**/
void receive_msg(task_id_t task_id, MessageDef **received_msg);
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg);
/** \brief Try to retrieves a message in the queue associated to task_id and matching requested instance.
\param task_id Task ID of the receiving task
\param instance Instance of the task used for virtualization
\param received_msg Pointer to the allocated message
**/
void poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg);
void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg);
/** \brief Start thread associated to the task
* \param task_id task to start
......@@ -111,30 +111,43 @@ void poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg)
* \param args_p Optional argument to pass to the start routine
* @returns -1 on failure, 0 otherwise
**/
int intertask_interface_create_task(task_id_t task_id,
int itti_create_task(task_id_t task_id,
void *(*start_routine) (void *),
void *args_p);
/** \brief Mark the task as in ready state
* \param task_id task to mark as ready
**/
void intertask_interface_mark_task_ready(task_id_t task_id);
void itti_mark_task_ready(task_id_t task_id);
/** \brief Indicate that the task is completed and initiate termination of all tasks.
* \param task_id task that is completed
**/
void itti_terminate_tasks(task_id_t task_id);
/** \brief Return the printable string associated with the message
* \param message_id Id of the message
**/
char *get_message_name(MessagesIds message_id);
char *itti_get_message_name(MessagesIds message_id);
/** \brief Alloc and memset(0) a new itti message.
\param origin_task_id Task ID of the sending task
\param message_id Message ID
@returns NULL in case of failure or newly allocated mesage ref
* \param origin_task_id Task ID of the sending task
* \param message_id Message ID
* @returns NULL in case of failure or newly allocated mesage ref