diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index 0f58e2fb32ebba34620b6821fa83e306fcff84b3..d09110d442877599085f7a919543a2ff94ef83aa 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -85,6 +85,13 @@ struct message_list_s { uint32_t message_priority; ///< Message priority }; +typedef struct thread_desc_s { + /* pthread associated with the thread */ + pthread_t task_thread; + /* State of the thread */ + volatile task_state_t task_state; +} thread_desc_t; + typedef struct task_desc_s { /* Queue of messages belonging to the task */ #if !defined(ENABLE_EVENT_FD) @@ -117,23 +124,22 @@ typedef struct task_desc_s { int epoll_nb_events; #endif - /* pthread associated with the task */ - pthread_t task_thread; - /* State of the task */ - volatile task_state_t task_state; } task_desc_t; struct itti_desc_s { + thread_desc_t *threads; task_desc_t *tasks; + /* Current message number. Incremented every call to send_msg_to_task */ message_number_t message_number __attribute__((aligned(8))); thread_id_t thread_max; + task_id_t task_max; MessagesIds messages_id_max; pthread_t thread_handling_signals; - const char * const *threads_name; + const task_info_t *tasks_info; const message_info_t *messages_info; }; @@ -161,16 +167,15 @@ const char *itti_get_message_name(MessagesIds message_id) { const char *itti_get_task_name(task_id_t task_id) { - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); - - return (itti_desc.threads_name[thread_id]); + return (itti_desc.tasks_info[task_id].name); } int itti_send_broadcast_message(MessageDef *message_p) { + task_id_t destination_task_id; thread_id_t origin_thread_id; - uint32_t i; + uint32_t thread_id; int ret = 0; int result; @@ -178,19 +183,24 @@ int itti_send_broadcast_message(MessageDef *message_p) { origin_thread_id = TASK_GET_THREAD_ID(message_p->header.originTaskId); - for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) { + destination_task_id = TASK_FIRST; + for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) { MessageDef *new_message_p; + while (thread_id != TASK_GET_THREAD_ID(destination_task_id)) + { + destination_task_id++; + } /* Skip task that broadcast the message */ - if (i != origin_thread_id) { + if (thread_id != origin_thread_id) { /* Skip tasks which are not running */ - if (itti_desc.tasks[i].task_state == TASK_STATE_READY) { + if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) { new_message_p = malloc (sizeof(MessageDef)); DevAssert(message_p != NULL); memcpy (new_message_p, message_p, sizeof(MessageDef)); - result = itti_send_msg_to_task (TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p); - DevCheck(result >= 0, message_p->header.messageId, i, 0); + result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p); + DevCheck(result >= 0, message_p->header.messageId, thread_id, destination_task_id); } } } @@ -222,7 +232,7 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me uint32_t message_id; DevAssert(message != NULL); - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); message->header.destinationTaskId = task_id; message->header.instance = instance; @@ -232,17 +242,17 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me priority = itti_get_message_priority (message_id); /* We cannot send a message if the task is not running */ - DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY, itti_desc.tasks[thread_id].task_state, + DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state, TASK_STATE_READY, thread_id); #if !defined(ENABLE_EVENT_FD) /* Lock the mutex to get exclusive access to the list */ - pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); /* Check the number of messages in the queue */ - DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK, - (itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK, - itti_desc.tasks[thread_id].message_in_queue); + DevCheck((itti_desc.tasks[task_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK, + (itti_desc.tasks[task_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK, + itti_desc.tasks[task_id].message_in_queue); #endif /* Allocate new list element */ @@ -264,21 +274,21 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me { uint64_t sem_counter = 1; - lfds611_queue_enqueue(itti_desc.tasks[thread_id].message_queue, new); + lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new); /* Call to write for an event fd must be of 8 bytes */ - write(itti_desc.tasks[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter)); } #else - if (STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) { - STAILQ_INSERT_HEAD (&itti_desc.tasks[thread_id].message_queue, new, next_element); + if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) { + STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element); } else { // struct message_list_s *insert_after = NULL; // struct message_list_s *temp; // // /* This method is inefficient... */ -// STAILQ_FOREACH(temp, &itti_desc.tasks[thread_id].message_queue, next_element) { +// STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) { // struct message_list_s *next; // next = STAILQ_NEXT(temp, next_element); // /* Increment message priority to create a sort of @@ -292,26 +302,26 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me // } // } // if (insert_after == NULL) { - STAILQ_INSERT_TAIL (&itti_desc.tasks[thread_id].message_queue, new, next_element); + STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element); // } else { -// STAILQ_INSERT_AFTER(&itti_desc.tasks[thread_id].message_queue, insert_after, new, +// STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new, // next_element); // } } /* Update the number of messages in the queue */ - itti_desc.tasks[thread_id].message_in_queue++; - if (itti_desc.tasks[thread_id].message_in_queue == 1) { + itti_desc.tasks[task_id].message_in_queue++; + if (itti_desc.tasks[task_id].message_in_queue == 1) { /* Emit a signal to wake up target task thread */ - pthread_cond_signal (&itti_desc.tasks[thread_id].message_queue_cond_var); + pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var); } /* Release the mutex */ - pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); #endif ITTI_DEBUG( "Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n", - itti_desc.messages_info[message_id].name, message_number, priority, thread_id, itti_desc.threads_name[thread_id]); + itti_desc.messages_info[message_id].name, message_number, priority, task_id, itti_get_task_name(task_id)); return 0; } @@ -319,23 +329,22 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me void itti_subscribe_event_fd(task_id_t task_id, int fd) { struct epoll_event event; - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevCheck(fd >= 0, fd, 0, 0); - itti_desc.tasks[thread_id].nb_events++; + itti_desc.tasks[task_id].nb_events++; /* Reallocate the events */ - itti_desc.tasks[thread_id].events = realloc( - itti_desc.tasks[thread_id].events, - itti_desc.tasks[thread_id].nb_events * sizeof(struct epoll_event)); + itti_desc.tasks[task_id].events = realloc( + itti_desc.tasks[task_id].events, + itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event)); event.events = EPOLLIN; event.data.fd = fd; /* Add the event fd to the list of monitored events */ - if (epoll_ctl(itti_desc.tasks[thread_id].epoll_fd, EPOLL_CTL_ADD, fd, + if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0) { ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n", @@ -347,13 +356,11 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd) void itti_unsubscribe_event_fd(task_id_t task_id, int fd) { - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevCheck(fd >= 0, fd, 0, 0); /* Add the event fd to the list of monitored events */ - if (epoll_ctl(itti_desc.tasks[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) + if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) { ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n", itti_get_task_name(task_id), fd, strerror(errno)); @@ -361,21 +368,19 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd) DevAssert(0 == 1); } - itti_desc.tasks[thread_id].nb_events--; - itti_desc.tasks[thread_id].events = realloc( - itti_desc.tasks[thread_id].events, - itti_desc.tasks[thread_id].nb_events * sizeof(struct epoll_event)); + itti_desc.tasks[task_id].nb_events--; + itti_desc.tasks[task_id].events = realloc( + itti_desc.tasks[task_id].events, + itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event)); } int itti_get_events(task_id_t task_id, struct epoll_event **events) { - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); - *events = itti_desc.tasks[thread_id].events; + *events = itti_desc.tasks[task_id].events; - return itti_desc.tasks[thread_id].epoll_nb_events; + return itti_desc.tasks[task_id].epoll_nb_events; } static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg) @@ -384,9 +389,7 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t int epoll_timeout = 0; int i; - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevAssert(received_msg != NULL); *received_msg = NULL; @@ -403,9 +406,9 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t } do { - epoll_ret = epoll_wait(itti_desc.tasks[thread_id].epoll_fd, - itti_desc.tasks[thread_id].events, - itti_desc.tasks[thread_id].nb_events, + epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd, + itti_desc.tasks[task_id].events, + itti_desc.tasks[task_id].nb_events, epoll_timeout); } while (epoll_ret < 0 && errno == EINTR); @@ -419,22 +422,22 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t return; } - itti_desc.tasks[thread_id].epoll_nb_events = epoll_ret; + itti_desc.tasks[task_id].epoll_nb_events = epoll_ret; for (i = 0; i < epoll_ret; i++) { /* Check if there is an event for ITTI for the event fd */ - if ((itti_desc.tasks[thread_id].events[i].events & EPOLLIN) && - (itti_desc.tasks[thread_id].events[i].data.fd == itti_desc.tasks[thread_id].task_event_fd)) + if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) && + (itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd)) { struct message_list_s *message; uint64_t sem_counter; /* Read will always return 1 */ - read (itti_desc.tasks[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter)); - if (lfds611_queue_dequeue (itti_desc.tasks[thread_id].message_queue, (void **) &message) == 0) { + if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) { /* No element in list -> this should not happen */ - DevParam(thread_id, task_id, epoll_ret); + DevParam(task_id, epoll_ret, 0); } *received_msg = message->msg; free (message); @@ -449,43 +452,39 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) #if defined(ENABLE_EVENT_FD) itti_receive_msg_internal_event_fd(task_id, 0, received_msg); #else - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevAssert(received_msg != NULL); // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); - if (itti_desc.tasks[thread_id].message_in_queue == 0) { - ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", thread_id, itti_desc.threads_name[thread_id]); + if (itti_desc.tasks[task_id].message_in_queue == 0) { + ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id)); // Wait while list == 0 - pthread_cond_wait (&itti_desc.tasks[thread_id].message_queue_cond_var, - &itti_desc.tasks[thread_id].message_queue_mutex); - ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification for task %x\n", - thread_id, itti_desc.threads_name[thread_id], task_id); + pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var, + &itti_desc.tasks[task_id].message_queue_mutex); + ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n", + task_id, itti_get_task_name(task_id)); } - if (!STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) { - struct message_list_s *temp = STAILQ_FIRST (&itti_desc.tasks[thread_id].message_queue); + if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) { + struct message_list_s *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue); /* Update received_msg reference */ *received_msg = temp->msg; /* Remove message from queue */ - STAILQ_REMOVE_HEAD (&itti_desc.tasks[thread_id].message_queue, next_element); + STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element); free (temp); - itti_desc.tasks[thread_id].message_in_queue--; + itti_desc.tasks[task_id].message_in_queue--; } // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); #endif } -void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) { - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - - DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); +void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevAssert(received_msg != NULL); *received_msg = NULL; @@ -493,38 +492,35 @@ void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received #if defined(ENABLE_EVENT_FD) itti_receive_msg_internal_event_fd(task_id, 1, received_msg); #else - if (itti_desc.tasks[thread_id].message_in_queue != 0) { + if (itti_desc.tasks[task_id].message_in_queue != 0) { struct message_list_s *temp; // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); - STAILQ_FOREACH (temp, &itti_desc.tasks[thread_id].message_queue, next_element) + STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element) { - if ((temp->msg->header.destinationTaskId == task_id) - && ((instance == INSTANCE_ALL) || (temp->msg->header.instance == instance))) { - /* Update received_msg reference */ - *received_msg = temp->msg; - - /* Remove message from queue */ - STAILQ_REMOVE (&itti_desc.tasks[thread_id].message_queue, temp, message_list_s, next_element); - free (temp); - itti_desc.tasks[thread_id].message_in_queue--; - - ITTI_DEBUG( - "Receiver queue[(%u:%s)] got new message %s, number %lu for task %x\n", - thread_id, itti_desc.threads_name[thread_id], itti_desc.messages_info[temp->msg->header.messageId].name, temp->message_number, task_id); - break; - } + /* Update received_msg reference */ + *received_msg = temp->msg; + + /* Remove message from queue */ + STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element); + free (temp); + itti_desc.tasks[task_id].message_in_queue--; + + ITTI_DEBUG( + "Receiver queue[(%u:%s)] got new message %s, number %lu\n", + task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->header.messageId].name, temp->message_number); + break; } // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); } #endif if (*received_msg == NULL) { - ITTI_DEBUG("No message in queue[(%u:%s)] for task %x\n", thread_id, itti_desc.threads_name[thread_id], task_id); + ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id)); } } @@ -534,16 +530,16 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar DevAssert(start_routine != NULL); DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); - DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id, - itti_desc.tasks[thread_id].task_state); + DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id, + itti_desc.threads[thread_id].task_state); - itti_desc.tasks[thread_id].task_state = TASK_STATE_STARTING; + itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING; - result = pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p); + result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p); DevCheck(result>= 0, task_id, thread_id, result); /* Wait till the thread is completely ready */ - while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY) + while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY) ; return 0; } @@ -555,14 +551,14 @@ void itti_mark_task_ready(task_id_t task_id) { #if !defined(ENABLE_EVENT_FD) // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); #endif - itti_desc.tasks[thread_id].task_state = TASK_STATE_READY; + itti_desc.threads[thread_id].task_state = TASK_STATE_READY; #if !defined(ENABLE_EVENT_FD) // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex); + pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); #endif } @@ -581,27 +577,31 @@ void itti_terminate_tasks(task_id_t task_id) { pthread_exit (NULL); } -int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * const *threads_name, +int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info, const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) { int i; itti_desc.message_number = 1; - ITTI_DEBUG("Init: %d threads, %d messages\n", thread_max, messages_id_max); + ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max); CHECK_INIT_RETURN(signal_init()); /* Saves threads and messages max values */ + itti_desc.task_max = task_max; itti_desc.thread_max = thread_max; itti_desc.messages_id_max = messages_id_max; itti_desc.thread_handling_signals = -1; - itti_desc.threads_name = threads_name; + itti_desc.tasks_info = tasks_info; itti_desc.messages_info = messages_info; /* Allocates memory for tasks info */ - itti_desc.tasks = calloc (itti_desc.thread_max, sizeof(task_desc_t)); + itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t)); + + /* Allocates memory for threads info */ + itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t)); /* Initializing each queue and related stuff */ - for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) + for (i = TASK_FIRST; i < itti_desc.task_max; i++) { #if defined(ENABLE_EVENT_FD) ITTI_DEBUG("Creating queue of message of size %u\n", @@ -652,9 +652,14 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * // Initialize Cond vars pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL); #endif + } - itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED; + /* Initializing each thread */ + for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) + { + itti_desc.threads[i].task_state = TASK_STATE_NOT_CONFIGURED; } + itti_dump_init (messages_definition_xml, dump_file_name); CHECK_INIT_RETURN(timer_init ()); @@ -664,7 +669,8 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * void itti_wait_tasks_end(void) { int end = 0; - int i; + int thread_id; + task_id_t task_id; int ready_tasks; int result; int retries = 10; @@ -679,17 +685,22 @@ void itti_wait_tasks_end(void) { do { ready_tasks = 0; - for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) { + task_id = TASK_FIRST; + for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) { /* Skip tasks which are not running */ - if (itti_desc.tasks[i].task_state == TASK_STATE_READY) { + if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) { + while (thread_id != TASK_GET_THREAD_ID(task_id)) + { + task_id++; + } - result = pthread_tryjoin_np (itti_desc.tasks[i].task_thread, NULL); + result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL); - ITTI_DEBUG("Thread %s join status %d\n", itti_desc.threads_name[i], result); + ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result); if (result == 0) { /* Thread has terminated */ - itti_desc.tasks[i].task_state = TASK_STATE_ENDED; + itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED; } else { /* Thread is still running, count it */ diff --git a/common/utils/itti/intertask_interface.h b/common/utils/itti/intertask_interface.h index 7ee768937520a9513d3ebbe739ea299005b7ca5d..f046fc106049f597a255ba52408e13439d9b4db6 100644 --- a/common/utils/itti/intertask_interface.h +++ b/common/utils/itti/intertask_interface.h @@ -86,6 +86,12 @@ enum task_priorities { TASK_PRIORITY_MIN = 10, }; +typedef struct task_info_s { + thread_id_t thread; + /* Printable name */ + const char * const name; +} task_info_t; + /** \brief Send a broadcast message to every task \param message_p Pointer to the message to send @returns < 0 on failure, 0 otherwise @@ -129,12 +135,11 @@ int itti_get_events(task_id_t task_id, struct epoll_event **events); **/ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg); -/** \brief Try to retrieves a message in the queue associated to task_id and matching requested instance. +/** \brief Try to retrieves a message in the queue associated to task_id. \param task_id Task ID of the receiving task - \param instance Instance of the task used for virtualization \param received_msg Pointer to the allocated message **/ -void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg); +void itti_poll_msg(task_id_t task_id, MessageDef **received_msg); /** \brief Start thread associated to the task * \param task_id task to start diff --git a/common/utils/itti/intertask_interface_dump.c b/common/utils/itti/intertask_interface_dump.c index a18e7fb9dfa8fbde9ef1636549c530ed45f1928c..b7e71edd0535d1c1d8d268b47ea5c17554524018 100644 --- a/common/utils/itti/intertask_interface_dump.c +++ b/common/utils/itti/intertask_interface_dump.c @@ -29,7 +29,7 @@ *******************************************************************************/ /** @brief Intertask Interface Signal Dumper - * Allows users to connect their itti_debugger to this process and dump + * Allows users to connect their itti_analyzer to this process and dump * signals exchanged between tasks. * @author Sebastien Roux <sebastien.roux@eurecom.fr> */ diff --git a/common/utils/itti/intertask_interface_init.h b/common/utils/itti/intertask_interface_init.h index 26bbcb3b5605eee6dd9d85969bc1448c2ca7e927..70a88ad3e2a7386723fc2c1dc6210ca325cc54b8 100644 --- a/common/utils/itti/intertask_interface_init.h +++ b/common/utils/itti/intertask_interface_init.h @@ -54,11 +54,11 @@ const char * const messages_definition_xml = { #include "messages_xml.h" }; -/* Map thread id to printable name. */ -const char * const threads_name[] = { - "unused", -#define TASK_DEF(tHREADiD, pRIO) #tHREADiD, -#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) +/* Map task id to printable name. */ +const task_info_t tasks_info[] = { + {0, "TASK_UNKNOWN"}, +#define TASK_DEF(tHREADiD, pRIO) {tHREADiD##_THREAD, #tHREADiD}, +#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) {sUBtASKiD##_THREAD, #sUBtASKiD}, #include <tasks_def.h> #undef SUB_TASK_DEF #undef TASK_DEF @@ -79,7 +79,7 @@ const message_info_t messages_info[] = { * \param threads_name Pointer on the threads name information as created by this include file * \param messages_info Pointer on messages information as created by this include file **/ -int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * const *threads_name, +int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info, const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name); diff --git a/common/utils/itti/intertask_interface_types.h b/common/utils/itti/intertask_interface_types.h index 1968557cd71b77aab61829c8fde9add6b614f2b3..7a22dfd4c4fa6f242e405867772fc9e12f2e6cde 100644 --- a/common/utils/itti/intertask_interface_types.h +++ b/common/utils/itti/intertask_interface_types.h @@ -56,12 +56,7 @@ #define TASK_SUB_TASK_ID_LENGTH 8 /* Defines to extract task ID fields */ -#define TASK_GET_THREAD_ID(tASKiD) UL_FIELD_EXTRACT(tASKiD, TASK_THREAD_ID_OFFSET, TASK_THREAD_ID_LENGTH) -#define TASK_GET_SUB_TASK_ID(tASKiD) UL_FIELD_EXTRACT(tASKiD, TASK_SUB_TASK_ID_OFFSET, TASK_SUB_TASK_ID_LENGTH) - -/* Defines to shift task ID fields */ -#define TASK_SHIFT_THREAD_ID(tHREADiD) UL_BIT_SHIFT(tHREADiD, TASK_THREAD_ID_OFFSET) -#define TASK_SHIFT_SUB_TASK_ID(sUBtASKiD) UL_BIT_SHIFT(sUBtASKiD, TASK_SUB_TASK_ID_OFFSET) +#define TASK_GET_THREAD_ID(tASKiD) (itti_desc.tasks_info[tASKiD].thread) #include <messages_types.h> @@ -86,29 +81,31 @@ typedef enum #undef SUB_TASK_DEF #undef TASK_DEF - THREAD_MAX, THREAD_END = THREAD_MAX, + THREAD_MAX, } thread_id_t; //! Sub-tasks id, to defined offset form thread id typedef enum { -#define TASK_DEF(tHREADiD, pRIO) SUB_TASK_INIT_##tHREADiD = 0, -#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) SUB_TASK_OFFSET_##sUBtASKiD, +#define TASK_DEF(tHREADiD, pRIO) tHREADiD##_THREAD = THREAD_##tHREADiD, +#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) sUBtASKiD##_THREAD = THREAD_##tHREADiD, #include <tasks_def.h> #undef SUB_TASK_DEF #undef TASK_DEF -} sub_task_id_t; +} task_thread_id_t; //! Tasks id of each task typedef enum { -#define TASK_DEF(tHREADiD, pRIO) tHREADiD = TASK_SHIFT_THREAD_ID(THREAD_##tHREADiD), -#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) sUBtASKiD = (TASK_SHIFT_THREAD_ID(THREAD_##tHREADiD) | TASK_SHIFT_SUB_TASK_ID(SUB_TASK_OFFSET_##sUBtASKiD)), + TASK_FIRST = 1, TASK_UNKNOWN = 0, + +#define TASK_DEF(tHREADiD, pRIO) tHREADiD, +#define SUB_TASK_DEF(tHREADiD, sUBtASKiD) sUBtASKiD, #include <tasks_def.h> #undef SUB_TASK_DEF #undef TASK_DEF - TASK_UNKNOWN = 0xFFFF, + TASK_MAX, } task_id_t; typedef union msg_s diff --git a/openair-cn/OAISIM_MME/oaisim_mme.c b/openair-cn/OAISIM_MME/oaisim_mme.c index ba81970559557df7e65db1c4d3fe9a7633f71a8e..1c5a5b3cc4620b04f15194c66c842898c3f5a2c2 100644 --- a/openair-cn/OAISIM_MME/oaisim_mme.c +++ b/openair-cn/OAISIM_MME/oaisim_mme.c @@ -68,7 +68,7 @@ int main(int argc, char *argv[]) /* Calling each layer init function */ CHECK_INIT_RETURN(log_init(&mme_config, oai_mme_log_specific)); - CHECK_INIT_RETURN(itti_init(THREAD_MAX, MESSAGES_ID_MAX, threads_name, messages_info, messages_definition_xml, NULL)); + CHECK_INIT_RETURN(itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, NULL)); CHECK_INIT_RETURN(nas_init(&mme_config)); CHECK_INIT_RETURN(sctp_init(&mme_config)); diff --git a/openair-cn/OAI_EPC/oai_epc.c b/openair-cn/OAI_EPC/oai_epc.c index 0d44426893668893304ebdac981f847f7e3c5e88..b8eff1ca8c233520612857339e304362a8fdf964 100644 --- a/openair-cn/OAI_EPC/oai_epc.c +++ b/openair-cn/OAI_EPC/oai_epc.c @@ -74,7 +74,7 @@ int main(int argc, char *argv[]) /* Calling each layer init function */ CHECK_INIT_RETURN(log_init(&mme_config, oai_epc_log_specific)); - CHECK_INIT_RETURN(itti_init(THREAD_MAX, MESSAGES_ID_MAX, threads_name, messages_info, messages_definition_xml, NULL)); + CHECK_INIT_RETURN(itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, NULL)); CHECK_INIT_RETURN(nas_init(&mme_config)); CHECK_INIT_RETURN(sctp_init(&mme_config)); diff --git a/openair-cn/OAI_SGW/oai_sgw.c b/openair-cn/OAI_SGW/oai_sgw.c index 76dd4955ee6c5cdfa0412a0ea39cbc5fddd64b02..df9efade2ea75bf430e52628e3990cb73e535349 100644 --- a/openair-cn/OAI_SGW/oai_sgw.c +++ b/openair-cn/OAI_SGW/oai_sgw.c @@ -60,7 +60,7 @@ int main(int argc, char *argv[]) /* Calling each layer init function */ CHECK_INIT_RETURN(log_init(&mme_config, oai_sgw_log_specific)); - CHECK_INIT_RETURN(itti_init(THREAD_MAX, MESSAGES_ID_MAX, threads_name, messages_info, messages_definition_xml, NULL)); + CHECK_INIT_RETURN(itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, NULL)); CHECK_INIT_RETURN(udp_init(&mme_config)); CHECK_INIT_RETURN(s11_sgw_init(&mme_config)); diff --git a/openair-cn/TEST/oaisim_mme_itti_test.c b/openair-cn/TEST/oaisim_mme_itti_test.c index f72bbc075ab86531de55da3d87a5b4b7969f4faa..f268609b83d57befed7271bbe2aeae5f2db50a92 100644 --- a/openair-cn/TEST/oaisim_mme_itti_test.c +++ b/openair-cn/TEST/oaisim_mme_itti_test.c @@ -78,7 +78,7 @@ int main(int argc, char *argv[]) /* Calling each layer init function */ log_init(&mme_config); - itti_init(THREAD_MAX, MESSAGES_ID_MAX, threads_name, messages_info, messages_definition_xml, NULL); + itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, NULL); sctp_init(&mme_config); udp_init(&mme_config); s1ap_mme_init(&mme_config); diff --git a/openair2/LAYER2/MAC/eNB_scheduler.c b/openair2/LAYER2/MAC/eNB_scheduler.c index 16d5a343afd67fb73791f5afdfee7539addee034..e73b709b2b2a945e3c9977481fec34b35a407402 100644 --- a/openair2/LAYER2/MAC/eNB_scheduler.c +++ b/openair2/LAYER2/MAC/eNB_scheduler.c @@ -4212,7 +4212,7 @@ void eNB_dlsch_ulsch_scheduler(u8 Mod_id,u8 cooperation_flag, u32 frame, u8 subf #if defined(ENABLE_ITTI) do { // Checks if a message has been sent to MAC sub-task - itti_poll_msg (TASK_MAC_ENB, INSTANCE_ALL, &msg_p); + itti_poll_msg (TASK_MAC_ENB, &msg_p); if (msg_p != NULL) { msg_name = ITTI_MSG_NAME (msg_p); diff --git a/openair2/LAYER2/MAC/ue_procedures.c b/openair2/LAYER2/MAC/ue_procedures.c index e06aa130de5e540b4e2535f071c222c30ef6214e..52e63ee973b17c2c3e5882d53d33add9870197d0 100644 --- a/openair2/LAYER2/MAC/ue_procedures.c +++ b/openair2/LAYER2/MAC/ue_procedures.c @@ -1289,7 +1289,7 @@ UE_L2_STATE_t ue_scheduler(u8 Mod_id,u32 frame, u8 subframe, lte_subframe_t dire #if defined(ENABLE_ITTI) do { // Checks if a message has been sent to MAC sub-task - itti_poll_msg (TASK_MAC_UE, INSTANCE_ALL, &msg_p); + itti_poll_msg (TASK_MAC_UE, &msg_p); if (msg_p != NULL) { msg_name = ITTI_MSG_NAME (msg_p); diff --git a/openair2/LAYER2/PDCP_v10.1.0/pdcp.c b/openair2/LAYER2/PDCP_v10.1.0/pdcp.c index 2d971e90c99e00874b0559b978a8191c41db639c..c8f2f936cd7f7c1603cfa40e9fb29ce0b2f11c7a 100755 --- a/openair2/LAYER2/PDCP_v10.1.0/pdcp.c +++ b/openair2/LAYER2/PDCP_v10.1.0/pdcp.c @@ -500,7 +500,7 @@ void pdcp_run (u32_t frame, u8 eNB_flag, u8 UE_index, u8 eNB_index) { #if defined(ENABLE_ITTI) do { // Checks if a message has been sent to PDCP sub-task - itti_poll_msg (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, INSTANCE_ALL, &msg_p); + itti_poll_msg (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, &msg_p); if (msg_p != NULL) { msg_name = ITTI_MSG_NAME (msg_p); diff --git a/targets/SIMU/USER/oaisim.c b/targets/SIMU/USER/oaisim.c index 8e63b41fbc4f78554dfa630f9db4caab94316063..a712e5640e5137ace21c95a809a8efac2999ff46 100644 --- a/targets/SIMU/USER/oaisim.c +++ b/targets/SIMU/USER/oaisim.c @@ -430,7 +430,7 @@ void *l2l1_task(void *args_p) { #if defined(ENABLE_ITTI) do { // Checks if a message has been sent to L2L1 task - itti_poll_msg (TASK_L2L1, INSTANCE_ALL, &message_p); + itti_poll_msg (TASK_L2L1, &message_p); if (message_p != NULL) { switch (message_p->header.messageId) { @@ -936,7 +936,7 @@ int main(int argc, char **argv) { } #if defined(ENABLE_ITTI) - itti_init(THREAD_MAX, MESSAGES_ID_MAX, threads_name, messages_info, messages_definition_xml, oai_emulation.info.itti_dump_file); + itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, oai_emulation.info.itti_dump_file); #endif #ifdef OPENAIR2