From cd22e69f5e61033aa1087ea703ed0a1242f7f2a3 Mon Sep 17 00:00:00 2001 From: winckel <winckel@eurecom.fr> Date: Fri, 29 Nov 2013 16:06:30 +0000 Subject: [PATCH] Removed non ENABLE_EVENT_FD option in ITTI. Modified message processing for RT task when RTAI is enabled. Clean-up logs format in RRC. Fixed some warnings. pre-ci Ok. git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4550 818b1a75-f10b-46b9-bf7c-635c3b92a50f --- common/utils/itti/intertask_interface.c | 399 ++++++++++-------------- common/utils/itti/intertask_interface.h | 13 +- common/utils/itti/timer.h | 2 +- openair2/RRC/LITE/rrc_UE.c | 20 +- openair2/RRC/LITE/rrc_eNB.c | 18 +- openair2/RRC/LITE/rrc_eNB_S1AP.c | 4 +- targets/RTAI/USER/lte-softmodem.c | 57 ++-- 7 files changed, 224 insertions(+), 289 deletions(-) diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index 945bdcbb51..0c1e03c409 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -41,14 +41,11 @@ # include <rtai_fifos.h> #endif -#include "queue.h" #include "assertions.h" -#if defined(ENABLE_EVENT_FD) -# include <sys/epoll.h> -# include <sys/eventfd.h> -# include "liblfds611.h" -#endif +#include <sys/epoll.h> +#include <sys/eventfd.h> +#include "liblfds611.h" #include "intertask_interface.h" #include "intertask_interface_dump.h" @@ -92,10 +89,6 @@ typedef enum task_state_s { /* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */ typedef struct message_list_s { -#if !defined(ENABLE_EVENT_FD) - STAILQ_ENTRY(message_list_s) next_element; -#endif - MessageDef *msg; ///< Pointer to the message message_number_t message_number; ///< Unique message number @@ -108,26 +101,11 @@ typedef struct thread_desc_s { /* State of the thread */ volatile task_state_t task_state; -} thread_desc_t; - -typedef struct task_desc_s { - /* Queue of messages belonging to the task */ -#if !defined(ENABLE_EVENT_FD) - STAILQ_HEAD(message_queue_head, message_list_s) message_queue; - - /* Number of messages in the queue */ - volatile uint32_t message_in_queue; - /* Mutex for the message queue */ - pthread_mutex_t message_queue_mutex; - /* Conditional var for message queue and task synchro */ - pthread_cond_t message_queue_cond_var; -#else - struct lfds611_queue_state *message_queue; /* This fd is used internally by ITTI. */ int epoll_fd; - /* The task fd */ + /* The thread fd */ int task_event_fd; /* Number of events to monitor */ @@ -141,7 +119,19 @@ typedef struct task_desc_s { struct epoll_event *events; int epoll_nb_events; + +#ifdef RTAI + /* Flag to mark real time thread */ + unsigned real_time; + + /* Counter to indicate from RTAI threads that messages are pending for the thread */ + unsigned messages_pending; #endif +} thread_desc_t; + +typedef struct task_desc_s { + /* Queue of messages belonging to the task */ + struct lfds611_queue_state *message_queue; } task_desc_t; typedef struct itti_desc_s { @@ -161,6 +151,11 @@ typedef struct itti_desc_s { const message_info_t *messages_info; itti_lte_time_t lte_time; + + int running; +#ifdef RTAI + pthread_t rt_relay_thread; +#endif } itti_desc_t; static itti_desc_t itti_desc; @@ -192,7 +187,7 @@ const char *itti_get_task_name(task_id_t task_id) return (itti_desc.tasks_info[task_id].name); } -static task_id_t itti_get_current_task_id() +static task_id_t itti_get_current_task_id(void) { task_id_t task_id; thread_id_t thread_id; @@ -280,9 +275,9 @@ inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size); } -int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message) +int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message) { - thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); + thread_id_t destination_thread_id; thread_id_t origin_task_id; message_list_t *new; uint32_t priority; @@ -290,9 +285,10 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me uint32_t message_id; DevAssert(message != NULL); - DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); + DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0); - message->ittiMsgHeader.destinationTaskId = task_id; + destination_thread_id = TASK_GET_THREAD_ID(destination_task_id); + message->ittiMsgHeader.destinationTaskId = destination_task_id; message->ittiMsgHeader.instance = instance; message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame; message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot; @@ -303,7 +299,7 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me #if defined(OAI_EMU) || defined(RTAI) vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG, - task_id); + destination_task_id); #endif priority = itti_get_message_priority (message_id); @@ -311,37 +307,23 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me /* Increment the global message number */ message_number = itti_increment_message_number (); -#ifdef RTAI +/* + * + #ifdef RTAI if ((pthread_self() == itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) || (task_id == TASK_UNKNOWN) || ((TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN) && (pthread_self() == itti_desc.threads[TASK_GET_PARENT_TASK_ID(origin_task_id)].task_thread))) #endif - itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name, - sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize); - - ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n", - itti_desc.messages_info[message_id].name, - message_number, - priority, - itti_get_task_name(origin_task_id), - task_id, - itti_get_task_name(task_id)); - - if (task_id != TASK_UNKNOWN) +*/ + itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name, + sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize); + + if (destination_task_id != TASK_UNKNOWN) { /* We cannot send a message if the task is not running */ - DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state, - TASK_STATE_READY, thread_id); - -#if !defined(ENABLE_EVENT_FD) - /* Lock the mutex to get exclusive access to the list */ - pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); - - /* Check the number of messages in the queue */ - DevCheck(itti_desc.tasks[task_id].message_in_queue < itti_desc.tasks_info[task_id].queue_size, - task_id, itti_desc.tasks[task_id].message_in_queue, itti_desc.tasks_info[task_id].queue_size); -#endif + DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, itti_desc.threads[destination_thread_id].task_state, + TASK_STATE_READY, destination_thread_id); /* Allocate new list element */ new = (message_list_t *) malloc (sizeof(struct message_list_s)); @@ -352,101 +334,68 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me new->message_number = message_number; new->message_priority = priority; -#if defined(ENABLE_EVENT_FD) -# ifdef RTAI - if ((pthread_self() != itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) && - (TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN)) - { - /* This is the RT task, -> enqueue in the parent thread */ - lfds611_queue_enqueue(itti_desc.tasks[TASK_GET_PARENT_TASK_ID(origin_task_id)].message_queue, new); + /* Enqueue message in destination task queue */ + lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new); - /* Signal from RT thread */ -// rtf_sem_post(56); - } else -# endif - /* No need to use a event fd for subtasks */ - if (TASK_GET_PARENT_TASK_ID(task_id) == TASK_UNKNOWN) +#ifdef RTAI + if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time) { - ssize_t write_ret; - uint64_t sem_counter = 1; - - lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new); - - /* Call to write for an event fd must be of 8 bytes */ - write_ret = write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter)); - DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, task_id); - } else { - lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new); - } -#else - if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) { - STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element); + /* This is a RT task, increase destination task messages pending counter */ + __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1); } - else { -// struct message_list_s *insert_after = NULL; -// struct message_list_s *temp; -// -// /* This method is inefficient... */ -// STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) { -// struct message_list_s *next; -// next = STAILQ_NEXT(temp, next_element); -// /* Increment message priority to create a sort of -// * priority based scheduler */ -// // if (temp->message_priority < TASK_PRIORITY_MAX) { -// // temp->message_priority++; -// // } -// if (next && next->message_priority < priority) { -// insert_after = temp; -// break; -// } -// } -// if (insert_after == NULL) { - STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element); -// } else { -// STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new, -// next_element); -// } + else +#endif + { + /* Only use event fd for tasks, subtasks will pool the queue */ + if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) + { + ssize_t write_ret; + uint64_t sem_counter = 1; + + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id); + } } - /* Update the number of messages in the queue */ - itti_desc.tasks[task_id].message_in_queue++; - if (itti_desc.tasks[task_id].message_in_queue == 1) { - /* Emit a signal to wake up target task thread */ - pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var); - } - /* Release the mutex */ - pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); -#endif + ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n", + itti_desc.messages_info[message_id].name, + message_number, + priority, + itti_get_task_name(origin_task_id), + destination_task_id, + itti_get_task_name(destination_task_id)); } #if defined(OAI_EMU) || defined(RTAI) vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG_END, - task_id); + destination_task_id); #endif return 0; } -#if defined(ENABLE_EVENT_FD) void itti_subscribe_event_fd(task_id_t task_id, int fd) { + thread_id_t thread_id; struct epoll_event event; DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevCheck(fd >= 0, fd, 0, 0); - itti_desc.tasks[task_id].nb_events++; + thread_id = TASK_GET_THREAD_ID(task_id); + itti_desc.threads[thread_id].nb_events++; /* Reallocate the events */ - itti_desc.tasks[task_id].events = realloc( - itti_desc.tasks[task_id].events, - itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event)); + itti_desc.threads[thread_id].events = realloc( + itti_desc.threads[thread_id].events, + itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event)); event.events = EPOLLIN | EPOLLERR; event.data.fd = fd; /* Add the event fd to the list of monitored events */ - if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd, + if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0) { ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n", @@ -460,11 +409,14 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd) void itti_unsubscribe_event_fd(task_id_t task_id, int fd) { + thread_id_t thread_id; + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevCheck(fd >= 0, fd, 0, 0); + thread_id = TASK_GET_THREAD_ID(task_id); /* Add the event fd to the list of monitored events */ - if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) + if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) { ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n", itti_get_task_name(task_id), fd, strerror(errno)); @@ -472,23 +424,27 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd) DevAssert(0 == 1); } - itti_desc.tasks[task_id].nb_events--; - itti_desc.tasks[task_id].events = realloc( - itti_desc.tasks[task_id].events, - itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event)); + itti_desc.threads[thread_id].nb_events--; + itti_desc.threads[thread_id].events = realloc( + itti_desc.threads[thread_id].events, + itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event)); } int itti_get_events(task_id_t task_id, struct epoll_event **events) { + thread_id_t thread_id; + DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); - *events = itti_desc.tasks[task_id].events; + thread_id = TASK_GET_THREAD_ID(task_id); + *events = itti_desc.threads[thread_id].events; - return itti_desc.tasks[task_id].epoll_nb_events; + return itti_desc.threads[thread_id].epoll_nb_events; } static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg) { + thread_id_t thread_id; int epoll_ret = 0; int epoll_timeout = 0; int i; @@ -496,6 +452,7 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); DevAssert(received_msg != NULL); + thread_id = TASK_GET_THREAD_ID(task_id); *received_msg = NULL; if (polling) { @@ -510,9 +467,9 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t } do { - epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd, - itti_desc.tasks[task_id].events, - itti_desc.tasks[task_id].nb_events, + epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd, + itti_desc.threads[thread_id].events, + itti_desc.threads[thread_id].nb_events, epoll_timeout); } while (epoll_ret < 0 && errno == EINTR); @@ -526,19 +483,19 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t return; } - itti_desc.tasks[task_id].epoll_nb_events = epoll_ret; + itti_desc.threads[thread_id].epoll_nb_events = epoll_ret; for (i = 0; i < epoll_ret; i++) { /* Check if there is an event for ITTI for the event fd */ - if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) && - (itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd)) + if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) && + (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd)) { struct message_list_s *message; uint64_t sem_counter; ssize_t read_ret; /* Read will always return 1 */ - read_ret = read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0); if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) { @@ -551,7 +508,6 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t } } } -#endif void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) { @@ -559,39 +515,9 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG, task_id); #endif -#if defined(ENABLE_EVENT_FD) itti_receive_msg_internal_event_fd(task_id, 0, received_msg); -#else - DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0); - DevAssert(received_msg != NULL); - - // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); - - if (itti_desc.tasks[task_id].message_in_queue == 0) { - ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id)); - // Wait while list == 0 - pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var, - &itti_desc.tasks[task_id].message_queue_mutex); - ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n", - task_id, itti_get_task_name(task_id)); - } - if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) { - message_list_t *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue); - - /* Update received_msg reference */ - *received_msg = temp->msg; - - /* Remove message from queue */ - STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element); - free (temp); - itti_desc.tasks[task_id].message_in_queue--; - } - // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); -#endif -#if defined(OAI_EMU) || defined(RTAI) + #if defined(OAI_EMU) || defined(RTAI) vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG_END, task_id); #endif @@ -608,7 +534,6 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { task_id); #endif -#if defined(ENABLE_EVENT_FD) { struct message_list_s *message; @@ -618,33 +543,6 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { free (message); } } -#else - if (itti_desc.tasks[task_id].message_in_queue != 0) { - message_list_t *temp; - - // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); - - STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element) - { - /* Update received_msg reference */ - *received_msg = temp->msg; - - /* Remove message from queue */ - STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element); - free (temp); - itti_desc.tasks[task_id].message_in_queue--; - - ITTI_DEBUG( - "Receiver queue[(%u:%s)] got new message %s, number %lu\n", - task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->ittiMsgHeader.messageId].name, temp->message_number); - break; - } - - // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); - } -#endif if ((itti_debug_poll) && (*received_msg == NULL)) { ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id)); @@ -678,6 +576,17 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar return 0; } +#ifdef RTAI +void itti_set_task_real_time(task_id_t task_id) +{ + thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); + + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + + itti_desc.threads[thread_id].real_time = TRUE; +} +#endif + void itti_mark_task_ready(task_id_t task_id) { thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); @@ -696,20 +605,10 @@ void itti_mark_task_ready(task_id_t task_id) /* Register the thread in itti dump */ itti_dump_thread_use_ring_buffer(); -#if defined(ENABLE_EVENT_FD) /* Mark the thread as using LFDS queue */ lfds611_queue_use(itti_desc.tasks[task_id].message_queue); -#else - // Lock the mutex to get exclusive access to the list - pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); -#endif itti_desc.threads[thread_id].task_state = TASK_STATE_READY; - -#if !defined(ENABLE_EVENT_FD) - // Release the mutex - pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex); -#endif } void itti_exit_task(void) { @@ -727,6 +626,40 @@ void itti_terminate_tasks(task_id_t task_id) { pthread_exit (NULL); } +#ifdef RTAI +static void *itti_rt_relay_thread(void *arg) +{ + thread_id_t thread_id; + unsigned pending_messages; + + while (itti_desc.running) + { + usleep (100); + + /* Checks for all non real time tasks if they have pending messages */ + for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) + { + if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY) + && (itti_desc.threads[thread_id].real_time == FALSE)) + { + pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0); + + if (pending_messages > 0) + { + ssize_t write_ret; + uint64_t sem_counter = pending_messages; + + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter)); + DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id); + } + } + } + } + return NULL; +} +#endif + int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info, const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) { task_id_t task_id; @@ -768,7 +701,6 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : ""); -#if defined(ENABLE_EVENT_FD) ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size); ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size); @@ -784,32 +716,38 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ret = rtf_sem_init(56, 0); } # endif + } + + /* Initializing each thread */ + for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) + { + itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED; - itti_desc.tasks[task_id].epoll_fd = epoll_create1(0); - if (itti_desc.tasks[task_id].epoll_fd == -1) { + itti_desc.threads[thread_id].epoll_fd = epoll_create1(0); + if (itti_desc.threads[thread_id].epoll_fd == -1) { ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno)); /* Always assert on this condition */ DevAssert(0 == 1); } - itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE); - if (itti_desc.tasks[task_id].task_event_fd == -1) + itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE); + if (itti_desc.threads[thread_id].task_event_fd == -1) { ITTI_ERROR("eventfd failed: %s\n", strerror(errno)); /* Always assert on this condition */ DevAssert(0 == 1); } - itti_desc.tasks[task_id].nb_events = 1; + itti_desc.threads[thread_id].nb_events = 1; - itti_desc.tasks[task_id].events = calloc(1, sizeof(struct epoll_event)); + itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event)); - itti_desc.tasks[task_id].events->events = EPOLLIN | EPOLLERR; - itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd; + itti_desc.threads[thread_id].events->events = EPOLLIN | EPOLLERR; + itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd; /* Add the event fd to the list of monitored events */ - if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, - itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0) + if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, + itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) { ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno)); /* Always assert on this condition */ @@ -817,24 +755,19 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i } ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", - itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id)); -#else - STAILQ_INIT (&itti_desc.tasks[task_id].message_queue); - itti_desc.tasks[task_id].message_in_queue = 0; + itti_desc.threads[thread_id].task_event_fd, itti_get_task_name(task_id)); - // Initialize mutexes - pthread_mutex_init (&itti_desc.tasks[task_id].message_queue_mutex, NULL); - - // Initialize Cond vars - pthread_cond_init (&itti_desc.tasks[task_id].message_queue_cond_var, NULL); +#ifdef RTAI + itti_desc.threads[thread_id].real_time = FALSE; + itti_desc.threads[thread_id].messages_pending = 0; #endif } - /* Initializing each thread */ - for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) - { - itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED; - } + itti_desc.running = TRUE; +#ifdef RTAI + /* Start RT relay thread */ + DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0); +#endif itti_dump_init (messages_definition_xml, dump_file_name); @@ -891,6 +824,8 @@ void itti_wait_tasks_end(void) { } } while ((ready_tasks > 0) && (retries--)); + itti_desc.running = FALSE; + if (ready_tasks > 0) { ITTI_DEBUG("Some threads are still running, force exit\n"); exit (0); diff --git a/common/utils/itti/intertask_interface.h b/common/utils/itti/intertask_interface.h index 3e83e27902..02d5cda60c 100644 --- a/common/utils/itti/intertask_interface.h +++ b/common/utils/itti/intertask_interface.h @@ -34,9 +34,7 @@ * @{ */ -#if defined(ENABLE_EVENT_FD) -# include <sys/epoll.h> -#endif +#include <sys/epoll.h> #ifdef RTAI # include <rtai_sem.h> @@ -123,7 +121,6 @@ int itti_send_broadcast_message(MessageDef *message_p); **/ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message); -#if defined(ENABLE_EVENT_FD) /** \brief Add a new fd to monitor. * NOTE: it is up to the user to read data associated with the fd * \param task_id Task ID of the receiving task @@ -143,7 +140,6 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd); * @returns number of events to handle **/ int itti_get_events(task_id_t task_id, struct epoll_event **events); -#endif /** \brief Retrieves a message in the queue associated to task_id. * If the queue is empty, the thread is blocked till a new message arrives. @@ -168,6 +164,13 @@ int itti_create_task(task_id_t task_id, void *(*start_routine) (void *), void *args_p); +#ifdef RTAI +/** \brief Mark the task as a real time task + * \param task_id task to mark as real time + **/ +void itti_set_task_real_time(task_id_t task_id); +#endif + /** \brief Mark the task as in ready state * \param task_id task to mark as ready **/ diff --git a/common/utils/itti/timer.h b/common/utils/itti/timer.h index c4504a8a71..1a0e8eb6fb 100644 --- a/common/utils/itti/timer.h +++ b/common/utils/itti/timer.h @@ -69,6 +69,6 @@ int timer_remove(long timer_id); * \param mme_config MME common configuration * @returns -1 on failure, 0 otherwise **/ -int timer_init(); +int timer_init(void); #endif diff --git a/openair2/RRC/LITE/rrc_UE.c b/openair2/RRC/LITE/rrc_UE.c index 3c276e8cf2..404f5d0aac 100644 --- a/openair2/RRC/LITE/rrc_UE.c +++ b/openair2/RRC/LITE/rrc_UE.c @@ -2355,12 +2355,12 @@ void *rrc_ue_task(void *args_p) { break; case MESSAGE_TEST: - LOG_I(RRC, "Received %s\n", msg_name); + LOG_I(RRC, "[UE %d] Received %s\n", Mod_id, msg_name); break; /* MAC messages */ case RRC_MAC_IN_SYNC_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name, RRC_MAC_IN_SYNC_IND (msg_p).frame, RRC_MAC_IN_SYNC_IND (msg_p).enb_index); UE_rrc_inst[Mod_id].Info[RRC_MAC_IN_SYNC_IND (msg_p).enb_index].N310_cnt = 0; @@ -2369,14 +2369,14 @@ void *rrc_ue_task(void *args_p) { break; case RRC_MAC_OUT_OF_SYNC_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name, RRC_MAC_OUT_OF_SYNC_IND (msg_p).frame, RRC_MAC_OUT_OF_SYNC_IND (msg_p).enb_index); UE_rrc_inst[Mod_id].Info[RRC_MAC_OUT_OF_SYNC_IND (msg_p).enb_index].N310_cnt ++; break; case RRC_MAC_BCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name, RRC_MAC_BCCH_DATA_IND (msg_p).frame, RRC_MAC_BCCH_DATA_IND (msg_p).enb_index); decode_BCCH_DLSCH_Message (Mod_id, RRC_MAC_BCCH_DATA_IND (msg_p).frame, @@ -2385,7 +2385,7 @@ void *rrc_ue_task(void *args_p) { break; case RRC_MAC_CCCH_DATA_CNF: - LOG_I(RRC, "Received %s: instance %d, eNB %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: eNB %d\n", Mod_id, msg_name, RRC_MAC_CCCH_DATA_CNF (msg_p).enb_index); // reset the tx buffer to indicate RRC that ccch was successfully transmitted (for example if contention resolution succeeds) @@ -2393,7 +2393,7 @@ void *rrc_ue_task(void *args_p) { break; case RRC_MAC_CCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name, RRC_MAC_CCCH_DATA_IND (msg_p).frame, RRC_MAC_CCCH_DATA_IND (msg_p).enb_index); srb_info_p = &UE_rrc_inst[Mod_id].Srb0[RRC_MAC_CCCH_DATA_IND (msg_p).enb_index]; @@ -2407,7 +2407,7 @@ void *rrc_ue_task(void *args_p) { #ifdef Rel10 case RRC_MAC_MCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d, mbsfn SA %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d, mbsfn SA %d\n", Mod_id, msg_name, RRC_MAC_MCCH_DATA_IND (msg_p).frame, RRC_MAC_MCCH_DATA_IND (msg_p).enb_index, RRC_MAC_MCCH_DATA_IND (msg_p).mbsfn_sync_area); decode_MCCH_Message (Mod_id, RRC_MAC_MCCH_DATA_IND (msg_p).frame, RRC_MAC_MCCH_DATA_IND (msg_p).enb_index, @@ -2418,7 +2418,7 @@ void *rrc_ue_task(void *args_p) { /* PDCP messages */ case RRC_DCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, DCCH %d, UE %d\n", msg_name, instance, + LOG_I(RRC, "[UE %d] Received %s: frame %d, DCCH %d, UE %d\n", Mod_id, msg_name, RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index, RRC_DCCH_DATA_IND (msg_p).ue_index); rrc_ue_decode_dcch (Mod_id, RRC_DCCH_DATA_IND (msg_p).frame, @@ -2435,7 +2435,7 @@ void *rrc_ue_task(void *args_p) { uint32_t length; uint8_t *buffer; - LOG_I(RRC, "Received %s: instance %d, UEid %d\n", msg_name, instance, NAS_UPLINK_DATA_REQ (msg_p).UEid); + LOG_I(RRC, "[UE %d] Received %s: UEid %d\n", Mod_id, msg_name, NAS_UPLINK_DATA_REQ (msg_p).UEid); /* Create message for PDCP (ULInformationTransfer_t) */ length = do_ULInformationTransfer(&buffer, NAS_UPLINK_DATA_REQ (msg_p).nasMsg.length, NAS_UPLINK_DATA_REQ (msg_p).nasMsg.data); @@ -2446,7 +2446,7 @@ void *rrc_ue_task(void *args_p) { } default: - LOG_E(RRC, "Received unexpected message %s\n", msg_name); + LOG_E(RRC, "[UE %d] Received unexpected message %s\n", Mod_id, msg_name); break; } diff --git a/openair2/RRC/LITE/rrc_eNB.c b/openair2/RRC/LITE/rrc_eNB.c index 836aaf1479..57f7372160 100644 --- a/openair2/RRC/LITE/rrc_eNB.c +++ b/openair2/RRC/LITE/rrc_eNB.c @@ -459,7 +459,7 @@ static void rrc_lite_eNB_init_security(u8 Mod_id, u8 UE_index) } ascii_buffer[2 * i] = '\0'; - LOG_T(RRC, "[OSA][MOD %02d][UE %02d] kenb = %s\n", Mod_id, UE_index, ascii_buffer); + LOG_T(RRC, "[OSA][eNB %d][UE %d] kenb = %s\n", Mod_id, UE_index, ascii_buffer); #endif } @@ -494,7 +494,7 @@ static uint8_t rrc_eNB_get_next_free_UE_index (uint8_t Mod_id, uint8_t *UE_ident } if (reg == 0) { - LOG_I(RRC, "Adding UE %d\n", first_index); + LOG_I(RRC, "[eNB %d] Adding UE %d\n", Mod_id, first_index); return (first_index); } else { @@ -507,7 +507,7 @@ void rrc_eNB_free_UE_index (uint8_t Mod_id, uint8_t UE_id) DevCheck(Mod_id < NB_eNB_INST, Mod_id, UE_id, NB_eNB_INST); DevCheck(UE_id < NUMBER_OF_UE_MAX, Mod_id, UE_id, NUMBER_OF_UE_MAX); - LOG_I (RRC, "Removing UE %d 0x%" PRIx64 "\n", UE_id, eNB_rrc_inst[Mod_id].Info.UE_list[UE_id]); + LOG_I (RRC, "[eNB %d] Removing UE %d rv 0x%" PRIx64 "\n", Mod_id, UE_id, eNB_rrc_inst[Mod_id].Info.UE_list[UE_id]); eNB_rrc_inst[Mod_id].Info.UE[UE_id].Status = RRC_IDLE; eNB_rrc_inst[Mod_id].Info.UE_list[UE_id] = 0; } @@ -3030,12 +3030,12 @@ void *rrc_enb_task(void *args_p) { break; case MESSAGE_TEST: - LOG_I(RRC, "Received %s\n", msg_name); + LOG_I(RRC, "[eNB %d] Received %s\n", instance, msg_name); break; /* Messages from MAC */ case RRC_MAC_CCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d,\n", msg_name, instance, + LOG_I(RRC, "[eNB %d] Received %s: instance %d, frame %d,\n", instance, msg_name, RRC_MAC_CCCH_DATA_IND (msg_p).frame); srb_info_p = &eNB_rrc_inst[instance].Srb0; @@ -3048,8 +3048,8 @@ void *rrc_enb_task(void *args_p) { /* Messages from PDCP */ case RRC_DCCH_DATA_IND: - LOG_I(RRC, "Received %s: instance %d, frame %d, DCCH %d, UE %d\n", msg_name, instance, - RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index, RRC_DCCH_DATA_IND (msg_p).ue_index); + LOG_I(RRC, "[eNB %d][UE %d] Received %s: instance %d, frame %d, DCCH %d\n", instance, RRC_DCCH_DATA_IND (msg_p).ue_index, msg_name, + RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index); rrc_eNB_decode_dcch (instance, RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index, RRC_DCCH_DATA_IND (msg_p).ue_index, RRC_DCCH_DATA_IND (msg_p).sdu_p, @@ -3074,7 +3074,7 @@ void *rrc_enb_task(void *args_p) { break; case S1AP_PAGING_IND: - LOG_E(RRC, "Received not yet implemented message %s\n", msg_name); + LOG_E(RRC, "[eNB %d] Received not yet implemented message %s\n", instance, msg_name); break; case S1AP_UE_CONTEXT_RELEASE_REQ: @@ -3083,7 +3083,7 @@ void *rrc_enb_task(void *args_p) { #endif default: - LOG_E(RRC, "Received unexpected message %s\n", msg_name); + LOG_E(RRC, "[eNB %d] Received unexpected message %s\n", instance, msg_name); break; } diff --git a/openair2/RRC/LITE/rrc_eNB_S1AP.c b/openair2/RRC/LITE/rrc_eNB_S1AP.c index 81904b5be4..7ebd60e951 100644 --- a/openair2/RRC/LITE/rrc_eNB_S1AP.c +++ b/openair2/RRC/LITE/rrc_eNB_S1AP.c @@ -102,7 +102,7 @@ static uint8_t get_UE_index_from_initial_id(uint8_t mod_id, uint16_t ue_initial_ for (ue_index = 0; ue_index < NUMBER_OF_UE_MAX; ue_index++) { /* Check if this UE is in use */ - LOG_D(RRC, "[eNB %d][UE %d] 0x%" PRIx64 " %d\n", mod_id, ue_index, + LOG_D(RRC, "[eNB %d][UE %d] UE rv 0x%" PRIx64 " %d\n", mod_id, ue_index, eNB_rrc_inst[mod_id].Info.UE_list[ue_index], eNB_rrc_inst[mod_id].Info.UE[ue_index].ue_initial_id); if (eNB_rrc_inst[mod_id].Info.UE_list[ue_index] != 0) { @@ -129,7 +129,7 @@ static uint8_t get_UE_index_from_eNB_ue_s1ap_id(uint8_t mod_id, uint32_t eNB_ue_ for (ue_index = 0; ue_index < NUMBER_OF_UE_MAX; ue_index++) { /* Check if this UE is in use */ - LOG_D(RRC, "[eNB %d][UE %d] 0x%" PRIx64 " %d\n", mod_id, ue_index, + LOG_D(RRC, "[eNB %d][UE %d] UE rv 0x%" PRIx64 " %d\n", mod_id, ue_index, eNB_rrc_inst[mod_id].Info.UE_list[ue_index], eNB_rrc_inst[mod_id].Info.UE[ue_index].eNB_ue_s1ap_id); if (eNB_rrc_inst[mod_id].Info.UE_list[ue_index] != 0) { diff --git a/targets/RTAI/USER/lte-softmodem.c b/targets/RTAI/USER/lte-softmodem.c index 4f188dc627..1d4077045d 100644 --- a/targets/RTAI/USER/lte-softmodem.c +++ b/targets/RTAI/USER/lte-softmodem.c @@ -460,24 +460,12 @@ void *emos_thread (void *arg) #if defined(ENABLE_ITTI) void *dummy_l2l1_task(void *arg) { - ssize_t ret = 0; - MessageDef *received_msg; - + itti_set_task_real_time(TASK_L2L1); itti_mark_task_ready(TASK_L2L1); while (!oai_exit) { - usleep(100); - - do { - itti_poll_msg(TASK_L2L1, &received_msg); - - if (received_msg != NULL) { - itti_send_msg_to_task(ITTI_MSG_DESTINATION_ID(received_msg), - ITTI_MSG_INSTANCE(received_msg), - received_msg); - } - } while(received_msg != NULL); + usleep(500000); } return NULL; } @@ -534,20 +522,20 @@ static void *eNB_thread(void *arg) diff = mbox_target - mbox_current; if (((slot%2==0) && (diff < (-14))) || ((slot%2==1) && (diff < (-7)))) { - // at the eNB, even slots have double as much time since most of the processing is done here and almost nothing in odd slots - LOG_D(HW,"eNB Frame %d, time %llu: missed slot, proceeding with next one (slot %d, hw_slot %d, diff %d)\n",frame, rt_get_time_ns(), slot, hw_slot, diff); - slot++; - if (frame>0) { - oai_exit=1; + // at the eNB, even slots have double as much time since most of the processing is done here and almost nothing in odd slots + LOG_D(HW,"eNB Frame %d, time %llu: missed slot, proceeding with next one (slot %d, hw_slot %d, diff %d)\n",frame, rt_get_time_ns(), slot, hw_slot, diff); + slot++; + if (frame > 0) { + oai_exit = 1; #if defined(ENABLE_ITTI) - itti_send_terminate_message(TASK_L2L1); + itti_send_terminate_message (TASK_L2L1); #endif - } - if (slot==20){ - slot=0; - frame++; - } - continue; + } + if (slot==20){ + slot=0; + frame++; + } + continue; } if (diff>8) LOG_D(HW,"eNB Frame %d, time %llu: skipped slot, waiting for hw to catch up (slot %d, hw_slot %d, mbox_current %d, mbox_target %d, diff %d)\n",frame, rt_get_time_ns(), slot, hw_slot, mbox_current, mbox_target, diff); @@ -555,9 +543,9 @@ static void *eNB_thread(void *arg) delay_cnt = 0; while ((diff>0) && (!oai_exit)) { - time_in = rt_get_time_ns(); - //LOG_D(HW,"eNB Frame %d delaycnt %d : hw_slot %d (%d), slot %d, (slot+1)*15=%d, diff %d, time %llu\n",frame,delay_cnt,hw_slot,((unsigned int *)DAQ_MBOX)[0],slot,(((slot+1)*15)>>1),diff,time_in); - //LOG_D(HW,"eNB Frame %d, time %llu: sleeping for %llu (slot %d, hw_slot %d, diff %d, mbox %d, delay_cnt %d)\n", frame, time_in, diff*DAQ_PERIOD,slot,hw_slot,diff,((volatile unsigned int *)DAQ_MBOX)[0],delay_cnt); + time_in = rt_get_time_ns(); + //LOG_D(HW,"eNB Frame %d delaycnt %d : hw_slot %d (%d), slot %d, (slot+1)*15=%d, diff %d, time %llu\n",frame,delay_cnt,hw_slot,((unsigned int *)DAQ_MBOX)[0],slot,(((slot+1)*15)>>1),diff,time_in); + //LOG_D(HW,"eNB Frame %d, time %llu: sleeping for %llu (slot %d, hw_slot %d, diff %d, mbox %d, delay_cnt %d)\n", frame, time_in, diff*DAQ_PERIOD,slot,hw_slot,diff,((volatile unsigned int *)DAQ_MBOX)[0],delay_cnt); ret = rt_sleep_ns(diff*DAQ_PERIOD); if (ret) LOG_D(HW,"eNB Frame %d, time %llu: rt_sleep_ns returned %d\n",frame, time_in); @@ -618,7 +606,7 @@ static void *eNB_thread(void *arg) if (fs4_test==0) { - phy_procedures_eNB_lte (last_slot, next_slot, PHY_vars_eNB_g[0], 0, 0,NULL); + phy_procedures_eNB_lte (last_slot, next_slot, PHY_vars_eNB_g[0], 0, no_relay,NULL); #ifndef IFFT_FPGA slot_offset_F = (next_slot)* (PHY_vars_eNB_g[0]->lte_frame_parms.ofdm_symbol_size)* @@ -1234,6 +1222,15 @@ int main(int argc, char **argv) { vcd_signal_dumper_init("/tmp/openair_dump_eNB.vcd"); } +#if defined(ENABLE_ITTI) + if (UE_flag == 1) { + log_set_instance_type (LOG_INSTANCE_UE); + } + else { + log_set_instance_type (LOG_INSTANCE_ENB); + } +#endif + #if defined(ENABLE_ITTI) itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, itti_dump_file); -- GitLab