From aa65305f462eb10ee613081e5a67cf8c13ba8076 Mon Sep 17 00:00:00 2001 From: winckel <winckel@eurecom.fr> Date: Wed, 18 Dec 2013 14:08:05 +0000 Subject: [PATCH] Integrated memory pools into ITTI for OAISIM and lte-softmodem targets. git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4761 818b1a75-f10b-46b9-bf7c-635c3b92a50f --- common/utils/itti/intertask_interface.c | 78 +++- common/utils/itti/intertask_interface.h | 2 +- common/utils/itti/intertask_interface_dump.c | 463 +++++++++++-------- openair-cn/S1AP/s1ap_eNB.c | 2 +- openair-cn/SCTP/sctp_eNB_itti_messaging.c | 2 +- openair-cn/SCTP/sctp_eNB_task.c | 3 +- openair2/COMMON/intertask_interface_conf.h | 2 +- openair2/RRC/LITE/L2_interface.c | 6 +- 8 files changed, 344 insertions(+), 214 deletions(-) diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index 1545b62bee..322a032234 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -37,20 +37,21 @@ #include <errno.h> #include <signal.h> -#include "assertions.h" - #include <sys/epoll.h> #include <sys/eventfd.h> -#include "liblfds611.h" - -#include "intertask_interface.h" -#include "intertask_interface_dump.h" #ifdef RTAI # include <rtai_shm.h> #endif +#include "liblfds611.h" + +#include "assertions.h" +#include "intertask_interface.h" +#include "intertask_interface_dump.h" + #if defined(OAI_EMU) || defined(RTAI) +# include "memory_pools.h" # include "vcd_signal_dumper.h" #endif @@ -175,6 +176,8 @@ typedef struct itti_desc_s { #endif #if defined(OAI_EMU) || defined(RTAI) + memory_pools_handle_t memory_pools_handle; + uint64_t vcd_poll_msg; uint64_t vcd_receive_msg; uint64_t vcd_send_msg; @@ -183,18 +186,26 @@ typedef struct itti_desc_s { static itti_desc_t itti_desc; -void *itti_malloc(task_id_t task_id, ssize_t size) +void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size) { void *ptr = NULL; -#ifdef RTAI -// ptr = rt_malloc(size); - ptr = malloc(size); +#if defined(OAI_EMU) || defined(RTAI) + ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id); #else - ptr = malloc(size); + ptr = malloc (size); #endif - DevCheck(ptr != NULL, size, task_id, 0); + DevCheck(ptr != NULL, size, origin_task_id, destination_task_id); +#if defined(OAI_EMU) || defined(RTAI) + if (ptr == NULL) + { + char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle); + + ITTI_ERROR ("\n%s", statistics); + free (statistics); + } +#endif return ptr; } @@ -202,10 +213,11 @@ void *itti_malloc(task_id_t task_id, ssize_t size) void itti_free(task_id_t task_id, void *ptr) { DevAssert(ptr != NULL); -#ifdef RTAI - free(ptr); + +#if defined(OAI_EMU) || defined(RTAI) + memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id); #else - free(ptr); + free (ptr); #endif } @@ -285,7 +297,7 @@ int itti_send_broadcast_message(MessageDef *message_p) { if (thread_id != origin_thread_id) { /* Skip tasks which are not running */ if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) { - new_message_p = itti_malloc (origin_task_id, sizeof(MessageDef)); + new_message_p = itti_malloc (origin_task_id, destination_task_id, sizeof(MessageDef)); DevAssert(message_p != NULL); memcpy (new_message_p, message_p, sizeof(MessageDef)); @@ -294,7 +306,7 @@ int itti_send_broadcast_message(MessageDef *message_p) { } } } - free (message_p); + itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p); return ret; } @@ -315,7 +327,7 @@ inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, Messag origin_task_id = itti_get_current_task_id(); } - temp = itti_malloc (origin_task_id, sizeof(MessageHeader) + size); + temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size); DevAssert(temp != NULL); temp->ittiMsgHeader.messageId = message_id; @@ -392,7 +404,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me itti_desc.threads[destination_thread_id].task_state, message_id); /* Allocate new list element */ - new = (message_list_t *) itti_malloc (origin_task_id, sizeof(struct message_list_s)); + new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s)); DevAssert(new != NULL); /* Fill in members */ @@ -610,7 +622,7 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) } DevAssert(message != NULL); *received_msg = message->msg; - free (message); + itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message); itti_desc.threads[task_id].sem_counter--; } else @@ -640,7 +652,7 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1) { *received_msg = message->msg; - free (message); + itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message); } } @@ -907,6 +919,21 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i rt_global_heap_open(); #endif +#if defined(OAI_EMU) || defined(RTAI) + itti_desc.memory_pools_handle = memory_pools_create (4); + memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS, 50); + memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100); + memory_pools_add_pool (itti_desc.memory_pools_handle, 1000, 1000); + memory_pools_add_pool (itti_desc.memory_pools_handle, 1000, 10000); + + { + char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle); + + printf ("%s", statistics); + free (statistics); + } +#endif + #if defined(OAI_EMU) || defined(RTAI) itti_desc.vcd_poll_msg = 0; itti_desc.vcd_receive_msg = 0; @@ -968,6 +995,15 @@ void itti_wait_tasks_end(void) { itti_desc.running = 0; +#if defined(OAI_EMU) || defined(RTAI) + { + char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle); + + printf ("%s", statistics); + free (statistics); + } +#endif + if (ready_tasks > 0) { ITTI_DEBUG(" Some threads are still running, force exit\n"); exit (0); diff --git a/common/utils/itti/intertask_interface.h b/common/utils/itti/intertask_interface.h index a1d6eb1c0b..56f3ac428c 100644 --- a/common/utils/itti/intertask_interface.h +++ b/common/utils/itti/intertask_interface.h @@ -230,7 +230,7 @@ void itti_wait_tasks_end(void); **/ void itti_send_terminate_message(task_id_t task_id); -void *itti_malloc(task_id_t task_id, ssize_t size); +void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size); void itti_free(task_id_t task_id, void *ptr); diff --git a/common/utils/itti/intertask_interface_dump.c b/common/utils/itti/intertask_interface_dump.c index 527f2baac9..4954ebafbd 100644 --- a/common/utils/itti/intertask_interface_dump.c +++ b/common/utils/itti/intertask_interface_dump.c @@ -64,17 +64,17 @@ #define SIGNAL_NAME_LENGTH 48 -static const int itti_dump_debug = 0; +static const int itti_dump_debug = 0; // 0x8 | 0x4 | 0x2; #ifdef RTAI -# define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) rt_printk("[ITTI][D]"x, ##args); } \ +# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) rt_printk("[ITTI_DUMP][D]"x, ##args); } \ while(0) -# define ITTI_DUMP_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \ +# define ITTI_DUMP_ERROR(x, args...) do { rt_printk("[ITTI_DUMP][E]"x, ##args); } \ while(0) #else -# define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ +# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) fprintf(stdout, "[ITTI_DUMP][D]"x, ##args); } \ while(0) -# define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \ +# define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI_DUMP][E]"x, ##args); } \ while(0) #endif @@ -153,14 +153,10 @@ static itti_desc_t itti_dump_queue; static FILE *dump_file = NULL; static int itti_dump_running = 1; -static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message); -static int itti_dump_handle_new_connection(int sd, const char *xml_definition, - uint32_t xml_definition_length); -static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, - const uint32_t message_definition_xml_length); +static volatile uint32_t pending_messages = 0; -static -int itti_dump_send_message(int sd, itti_dump_queue_item_t *message) +/*------------------------------------------------------------------------------*/ +static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message) { itti_dump_message_t *new_message; ssize_t bytes_sent = 0, total_sent = 0; @@ -202,11 +198,11 @@ int itti_dump_send_message(int sd, itti_dump_queue_item_t *message) return total_sent; } -static void itti_dump_fwrite_message(itti_dump_queue_item_t *message) +static int itti_dump_fwrite_message(itti_dump_queue_item_t *message) { itti_socket_header_t header; - if (dump_file != NULL && message) { + if ((dump_file != NULL) && (message != NULL)) { header.message_size = message->message_size + sizeof(itti_dump_message_t); header.message_type = message->message_type; @@ -218,7 +214,9 @@ static void itti_dump_fwrite_message(itti_dump_queue_item_t *message) // #if !defined(RTAI) fflush (dump_file); // #endif + return (1); } + return (0); } static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, @@ -237,7 +235,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin itti_dump_message = calloc(1, itti_dump_message_size); - ITTI_DUMP_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n", + ITTI_DUMP_DEBUG(0x2, "[%d] Sending XML definition message of size %zu to observer peer\n", sd, itti_dump_message_size); itti_dump_message->message_size = itti_dump_message_size; @@ -264,11 +262,33 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin return 0; } +static void itti_dump_user_data_delete_function(void *user_data, void *user_state) +{ + if (user_data != NULL) + { + itti_dump_queue_item_t *item; + task_id_t task_id; + + item = (itti_dump_queue_item_t *)user_data; + + if (item->data != NULL) + { + task_id = ITTI_MSG_ORIGIN_ID(item->data); + itti_free(task_id, item->data); + } + else + { + task_id = TASK_UNKNOWN; + } + itti_free(task_id, item); + } +} + static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size, uint32_t message_type) { struct lfds611_freelist_element *new_queue_element = NULL; - + int overwrite_flag; DevAssert(new != NULL); #if defined(OAI_EMU) || defined(RTAI) @@ -278,26 +298,39 @@ static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t messa new->message_type = message_type; new->message_size = message_size; - new_queue_element = lfds611_ringbuffer_get_write_element( - itti_dump_queue.itti_message_queue, &new_queue_element, NULL); + ITTI_DUMP_DEBUG (0x1, " itti_dump_enqueue_message: lfds611_ringbuffer_get_write_element\n"); + new_queue_element = lfds611_ringbuffer_get_write_element (itti_dump_queue.itti_message_queue, &new_queue_element, &overwrite_flag); + + if (overwrite_flag != 0) + { + void *old = NULL; - lfds611_freelist_set_user_data_in_element(new_queue_element, (void *)new); + lfds611_freelist_get_user_data_from_element(new_queue_element, &old); + ITTI_DUMP_DEBUG (0x4, " overwrite_flag set, freeing old data %p %p\n", new_queue_element, old); + itti_dump_user_data_delete_function (old, NULL); + } - lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, - new_queue_element); + lfds611_freelist_set_user_data_in_element(new_queue_element, (void *) new); + lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, new_queue_element); + if (overwrite_flag == 0) + { #ifdef RTAI - __sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1); + __sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1); #else - { - ssize_t write_ret; - eventfd_t sem_counter = 1; + { + ssize_t write_ret; + eventfd_t sem_counter = 1; - /* Call to write for an event fd must be of 8 bytes */ - write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); - DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0); - } + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); + DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0); + } #endif + __sync_fetch_and_add (&pending_messages, 1); + } + + ITTI_DUMP_DEBUG (0x2, " Added element to queue %p %p, pending %u, type %u\n", new_queue_element, new, pending_messages, message_type); #if defined(OAI_EMU) || defined(RTAI) vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT); @@ -306,111 +339,153 @@ static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t messa return 0; } -int itti_dump_queue_message(task_id_t sender_task, - message_number_t message_number, - MessageDef *message_p, - const char *message_name, - const uint32_t message_size) +static void itti_dump_socket_exit(void) { - if (itti_dump_running) - { - itti_dump_queue_item_t *new; - size_t message_name_length; - - DevAssert(message_name != NULL); - DevAssert(message_p != NULL); - -#if defined(OAI_EMU) || defined(RTAI) - vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN); -#endif - new = itti_malloc(sender_task, sizeof(itti_dump_queue_item_t)); -#if defined(OAI_EMU) || defined(RTAI) - vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT); -#endif - -#if defined(OAI_EMU) || defined(RTAI) - vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN); -#endif - new->data = itti_malloc(sender_task, message_size); -#if defined(OAI_EMU) || defined(RTAI) - vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT); +#ifndef RTAI + close(itti_dump_queue.event_fd); #endif + close(itti_dump_queue.itti_listen_socket); - memcpy(new->data, message_p, message_size); - new->data_size = message_size; - new->message_number = message_number; - - message_name_length = strlen(message_name) + 1; - DevCheck(message_name_length <= SIGNAL_NAME_LENGTH, message_name_length, - SIGNAL_NAME_LENGTH, 0); - memcpy(new->message_name, message_name, message_name_length); - - itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE); - } - - return 0; + /* Leave the thread as we detected end signal */ + pthread_exit(NULL); } -static void itti_dump_flush_ring_buffer(int flush_all) +static int itti_dump_flush_ring_buffer(int flush_all) { struct lfds611_freelist_element *element = NULL; - void *user_data; - int j; + void *user_data; + int j; + int consumer; #ifdef RTAI unsigned long number_of_messages; +#endif - number_of_messages = itti_dump_queue.messages_in_queue; + /* Check if there is a least one consumer */ + consumer = 0; + if (dump_file != NULL) + { + consumer = 1; + } + else + { + for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { + if (itti_dump_queue.itti_clients[j].sd > 0) { + consumer = 1; + break; + } + } + } - ITTI_DUMP_DEBUG("%lu elements in queue\n", number_of_messages); + if (consumer > 0) + { +#ifdef RTAI + number_of_messages = itti_dump_queue.messages_in_queue; - if (number_of_messages == 0) { - return; - } + ITTI_DUMP_DEBUG(0x4, "%lu elements in queue\n", number_of_messages); + + if (number_of_messages == 0) { + return (consumer); + } - __sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages); + __sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages); #endif - do { - /* Acquire the ring element */ - lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element); + do { + /* Acquire the ring element */ + lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element); - DevAssert(element != NULL); + __sync_fetch_and_sub (&pending_messages, 1); - /* Retrieve user part of the message */ - lfds611_freelist_get_user_data_from_element(element, &user_data); + if (element == NULL) + { + if (flush_all != 0) + { + flush_all = 0; + } + else + { + ITTI_DUMP_DEBUG(0x8, " Dump event with no data\n"); + DevMessage("Dump event with no data\n"); + } + } + else + { + /* Retrieve user part of the message */ + lfds611_freelist_get_user_data_from_element(element, &user_data); - if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL) - { -#ifndef RTAI - close(itti_dump_queue.event_fd); -#endif - close(itti_dump_queue.itti_listen_socket); + ITTI_DUMP_DEBUG (0x2, " removed element from queue %p %p, pending %u\n", element, user_data, pending_messages); - lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); + if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL) + { + lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); + itti_dump_socket_exit(); + } - /* Leave the thread as we detected end signal */ - pthread_exit(NULL); - } + /* Write message to file */ + itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data); - /* Write message to file */ - itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data); + /* Send message to remote analyzer */ + for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { + if (itti_dump_queue.itti_clients[j].sd > 0) { + itti_dump_send_message(itti_dump_queue.itti_clients[j].sd, + (itti_dump_queue_item_t *)user_data); + } + } - /* Send message to remote analyzer */ - for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { - if (itti_dump_queue.itti_clients[j].sd > 0) { - itti_dump_send_message(itti_dump_queue.itti_clients[j].sd, - (itti_dump_queue_item_t *)user_data); + itti_dump_user_data_delete_function (user_data, NULL); + lfds611_freelist_set_user_data_in_element(element, NULL); + + /* We have finished with this element, reinsert it in the ring buffer */ + lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); + } + } while(flush_all + #ifdef RTAI + && --number_of_messages + #endif + ); + } + + return (consumer); +} + +static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length) +{ + if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) { + uint8_t i; + + for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { + /* Let's find a place to store the new client */ + if (itti_dump_queue.itti_clients[i].sd == -1) { + break; } } - /* We have finished with this element, reinsert it in the ring buffer */ - lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); - } while(flush_all -#ifdef RTAI - && --number_of_messages -#endif - ); + ITTI_DUMP_DEBUG(0x2, " Found place to store new connection: %d\n", i); + + DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd); + + ITTI_DUMP_DEBUG(0x2, " Socket %d accepted\n", sd); + + /* Send the XML message definition */ + if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) { + ITTI_DUMP_ERROR(" Failed to send XML definition\n"); + close (sd); + return -1; + } + + itti_dump_queue.itti_clients[i].sd = sd; + itti_dump_queue.nb_connected++; + } else { + ITTI_DUMP_DEBUG(0x2, " Socket %d rejected\n", sd); + /* We have reached max number of users connected... + * Reject the connection. + */ + close (sd); + return -1; + } + + return 0; } static void *itti_dump_socket(void *arg_p) @@ -428,7 +503,7 @@ static void *itti_dump_socket(void *arg_p) struct timeval timeout; #endif - ITTI_DUMP_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT); + ITTI_DUMP_DEBUG(0x2, " Creating TCP dump socket on port %u\n", ITTI_PORT); message_definition_xml = (char *)arg_p; DevAssert(message_definition_xml != NULL); @@ -436,7 +511,7 @@ static void *itti_dump_socket(void *arg_p) message_definition_xml_length = strlen(message_definition_xml) + 1; if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - ITTI_DUMP_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" ocket creation failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } @@ -444,7 +519,7 @@ static void *itti_dump_socket(void *arg_p) rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); if (rc < 0) { - ITTI_DUMP_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno)); close(itti_listen_socket); pthread_exit(NULL); } @@ -454,7 +529,7 @@ static void *itti_dump_socket(void *arg_p) */ rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on); if (rc < 0) { - ITTI_DUMP_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno)); close(itti_listen_socket); pthread_exit(NULL); } @@ -466,11 +541,11 @@ static void *itti_dump_socket(void *arg_p) if (bind(itti_listen_socket, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) { - ITTI_DUMP_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" Bind failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } if (listen(itti_listen_socket, 5) < 0) { - ITTI_DUMP_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" Listen failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } @@ -515,11 +590,22 @@ static void *itti_dump_socket(void *arg_p) rc = select(max_sd + 1, &working_set, NULL, NULL, timeout_p); if (rc < 0) { - ITTI_DUMP_ERROR("select failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" select failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } else if (rc == 0) { /* Timeout */ - itti_dump_flush_ring_buffer(1); + if (itti_dump_flush_ring_buffer(1) == 0) + { + if (itti_dump_running) + { + ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n"); + usleep(100 * 1000); + } + else + { + itti_dump_socket_exit(); + } + } } desc_ready = rc; @@ -538,16 +624,40 @@ static void *itti_dump_socket(void *arg_p) /* Read will always return 1 for kernel versions > 2.6.30 */ read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); if (read_ret < 0) { - ITTI_DUMP_ERROR("Failed read for semaphore: %s\n", strerror(errno)); + ITTI_DUMP_ERROR(" Failed read for semaphore: %s\n", strerror(errno)); pthread_exit(NULL); } DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0); #if defined(KERNEL_VERSION_PRE_2_6_30) - itti_dump_flush_ring_buffer(1); + if (itti_dump_flush_ring_buffer(1) == 0) #else - itti_dump_flush_ring_buffer(0); + if (itti_dump_flush_ring_buffer(0) == 0) #endif - ITTI_DUMP_DEBUG("Write element to file\n"); + { + if (itti_dump_running) + { + ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n"); + usleep(100 * 1000); +#ifndef RTAI + { + ssize_t write_ret; + + sem_counter = 1; + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); + DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0); + } +#endif + } + else + { + itti_dump_socket_exit(); + } + } + else + { + ITTI_DUMP_DEBUG(0x1, " Write element to file\n"); + } } else #endif if (i == itti_listen_socket) { @@ -556,10 +666,10 @@ static void *itti_dump_socket(void *arg_p) if (client_socket < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { /* No more new connection */ - ITTI_DUMP_DEBUG("No more new connection\n"); + ITTI_DUMP_DEBUG(0x2, " No more new connection\n"); continue; } else { - ITTI_DUMP_ERROR("accept failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" accept failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } } @@ -580,7 +690,7 @@ static void *itti_dump_socket(void *arg_p) */ uint8_t j; - ITTI_DUMP_DEBUG("Socket %d disconnected\n", i); + ITTI_DUMP_DEBUG(0x2, " Socket %d disconnected\n", i); /* Close the socket and update info related to this connection */ close(i); @@ -622,41 +732,47 @@ static void *itti_dump_socket(void *arg_p) return NULL; } -static -int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length) +/*------------------------------------------------------------------------------*/ +int itti_dump_queue_message(task_id_t sender_task, + message_number_t message_number, + MessageDef *message_p, + const char *message_name, + const uint32_t message_size) { - if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) { - uint8_t i; + if (itti_dump_running) + { + itti_dump_queue_item_t *new; + size_t message_name_length; - for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { - /* Let's find a place to store the new client */ - if (itti_dump_queue.itti_clients[i].sd == -1) { - break; - } - } + DevAssert(message_name != NULL); + DevAssert(message_p != NULL); - ITTI_DUMP_DEBUG("Found place to store new connection: %d\n", i); +#if defined(OAI_EMU) || defined(RTAI) + vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN); +#endif + new = itti_malloc(sender_task, TASK_MAX + 100, sizeof(itti_dump_queue_item_t)); +#if defined(OAI_EMU) || defined(RTAI) + vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT); +#endif - DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd); +#if defined(OAI_EMU) || defined(RTAI) + vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN); +#endif + new->data = itti_malloc(sender_task, TASK_MAX + 100, message_size); +#if defined(OAI_EMU) || defined(RTAI) + vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT); +#endif - ITTI_DUMP_DEBUG("Socket %d accepted\n", sd); + memcpy(new->data, message_p, message_size); + new->data_size = message_size; + new->message_number = message_number; - /* Send the XML message definition */ - if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) { - ITTI_DUMP_ERROR("Failed to send XML definition\n"); - close (sd); - return -1; - } + message_name_length = strlen(message_name) + 1; + DevCheck(message_name_length <= SIGNAL_NAME_LENGTH, message_name_length, + SIGNAL_NAME_LENGTH, 0); + memcpy(new->message_name, message_name, message_name_length); - itti_dump_queue.itti_clients[i].sd = sd; - itti_dump_queue.nb_connected++; - } else { - ITTI_DUMP_DEBUG("Socket %d rejected\n", sd); - /* We have reached max number of users connected... - * Reject the connection. - */ - close (sd); - return -1; + itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE); } return 0; @@ -681,7 +797,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons if (dump_file == NULL) { - ITTI_DUMP_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno)); + ITTI_DUMP_ERROR(" can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno)); } else { @@ -700,7 +816,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons memset(&itti_dump_queue, 0, sizeof(itti_desc_t)); - ITTI_DUMP_DEBUG("Creating new ring buffer for itti dump of %u elements\n", + ITTI_DUMP_DEBUG(0x2, " Creating new ring buffer for itti dump of %u elements\n", ITTI_QUEUE_MAX_ELEMENTS); if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue, @@ -708,7 +824,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons NULL, NULL) != 1) { - ITTI_DUMP_ERROR("Failed to create ring buffer...\n"); + ITTI_DUMP_ERROR(" Failed to create ring buffer...\n"); /* Always assert on this condition */ DevAssert(0 == 1); } @@ -723,7 +839,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons # endif if (itti_dump_queue.event_fd == -1) { - ITTI_DUMP_ERROR("eventfd failed: %s\n", strerror(errno)); + ITTI_DUMP_ERROR(" eventfd failed: %s\n", strerror(errno)); /* Always assert on this condition */ DevAssert(0 == 1); } @@ -739,59 +855,38 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons /* initialized with default attributes */ ret = pthread_attr_init(&itti_dump_queue.attr); if (ret < 0) { - ITTI_DUMP_ERROR("pthread_attr_init failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" pthread_attr_init failed (%d:%s)\n", errno, strerror(errno)); DevAssert(0 == 1); } ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_FIFO); if (ret < 0) { - ITTI_DUMP_ERROR("pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno)); DevAssert(0 == 1); } ret = pthread_attr_setschedparam(&itti_dump_queue.attr, &scheduler_param); if (ret < 0) { - ITTI_DUMP_ERROR("pthread_attr_setschedparam failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" pthread_attr_setschedparam failed (%d:%s)\n", errno, strerror(errno)); DevAssert(0 == 1); } ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr, &itti_dump_socket, (void *)messages_definition_xml); if (ret < 0) { - ITTI_DUMP_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR(" pthread_create failed (%d:%s)\n", errno, strerror(errno)); DevAssert(0 == 1); } return 0; } -static void itti_dump_user_data_delete_function(void *user_data, void *user_state) -{ - if (user_data != NULL) - { - itti_dump_queue_item_t *item; - task_id_t task_id; - - item = (itti_dump_queue_item_t *)user_data; - - if (item->data != NULL) - { - task_id = ITTI_MSG_ORIGIN_ID(item->data); - itti_free(task_id, item->data); - } - else - { - task_id = TASK_UNKNOWN; - } - itti_free(task_id, item); - } -} - void itti_dump_exit(void) { void *arg; itti_dump_queue_item_t *new; - new = calloc(1, sizeof(itti_dump_queue_item_t)); + new = itti_malloc(TASK_UNKNOWN, TASK_UNKNOWN, sizeof(itti_dump_queue_item_t)); + memset(new, 0, sizeof(itti_dump_queue_item_t)); /* Set a flag to stop recording message */ itti_dump_running = 0; @@ -799,12 +894,12 @@ void itti_dump_exit(void) /* Send the exit signal to other thread */ itti_dump_enqueue_message(new, 0, ITTI_DUMP_EXIT_SIGNAL); - ITTI_DUMP_DEBUG("waiting for dumper thread to finish\n"); + ITTI_DUMP_DEBUG(0x2, " waiting for dumper thread to finish\n"); /* wait for the thread to terminate */ pthread_join(itti_dump_queue.itti_acceptor_thread, &arg); - ITTI_DUMP_DEBUG("dumper thread correctly exited\n"); + ITTI_DUMP_DEBUG(0x2, " dumper thread correctly exited\n"); if (dump_file != NULL) { diff --git a/openair-cn/S1AP/s1ap_eNB.c b/openair-cn/S1AP/s1ap_eNB.c index 1e4564ce4b..6f8c4153d2 100644 --- a/openair-cn/S1AP/s1ap_eNB.c +++ b/openair-cn/S1AP/s1ap_eNB.c @@ -230,7 +230,7 @@ void s1ap_eNB_handle_sctp_data_ind(sctp_data_ind_t *sctp_data_ind) s1ap_eNB_handle_message(sctp_data_ind->assoc_id, sctp_data_ind->stream, sctp_data_ind->buffer, sctp_data_ind->buffer_length); - free(sctp_data_ind->buffer); + itti_free(TASK_UNKNOWN, sctp_data_ind->buffer); } void *s1ap_eNB_task(void *arg) diff --git a/openair-cn/SCTP/sctp_eNB_itti_messaging.c b/openair-cn/SCTP/sctp_eNB_itti_messaging.c index 7d7a7e57ab..03a656e150 100644 --- a/openair-cn/SCTP/sctp_eNB_itti_messaging.c +++ b/openair-cn/SCTP/sctp_eNB_itti_messaging.c @@ -13,7 +13,7 @@ int sctp_itti_send_new_message_ind(task_id_t task_id, uint32_t assoc_id, uint8_t sctp_data_ind_p = &message_p->ittiMsg.sctp_data_ind; - sctp_data_ind_p->buffer = malloc(sizeof(uint8_t) * buffer_length); + sctp_data_ind_p->buffer = itti_malloc(TASK_SCTP, task_id, sizeof(uint8_t) * buffer_length); /* Copy the buffer */ memcpy((void *)sctp_data_ind_p->buffer, (void *)buffer, buffer_length); diff --git a/openair-cn/SCTP/sctp_eNB_task.c b/openair-cn/SCTP/sctp_eNB_task.c index 612a3dc687..a778115589 100644 --- a/openair-cn/SCTP/sctp_eNB_task.c +++ b/openair-cn/SCTP/sctp_eNB_task.c @@ -560,8 +560,7 @@ void *sctp_eNB_task(void *arg) ITTI_MSG_ID(received_msg), ITTI_MSG_NAME(received_msg)); break; } - - itti_free(TASK_SCTP, received_msg); + itti_free(ITTI_MSG_ORIGIN_ID(received_msg), received_msg); received_msg = NULL; } diff --git a/openair2/COMMON/intertask_interface_conf.h b/openair2/COMMON/intertask_interface_conf.h index df701a4d05..01b2eadd96 100644 --- a/openair2/COMMON/intertask_interface_conf.h +++ b/openair2/COMMON/intertask_interface_conf.h @@ -45,7 +45,7 @@ #define ITTI_PORT (10006) /* This is the queue size for signal dumper */ -#define ITTI_QUEUE_MAX_ELEMENTS (200 * 1000) +#define ITTI_QUEUE_MAX_ELEMENTS (10 * 1000) #define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */ #endif /* INTERTASK_INTERFACE_CONF_H_ */ diff --git a/openair2/RRC/LITE/L2_interface.c b/openair2/RRC/LITE/L2_interface.c index 5bb7476868..e3a16910d3 100644 --- a/openair2/RRC/LITE/L2_interface.c +++ b/openair2/RRC/LITE/L2_interface.c @@ -485,7 +485,7 @@ u8 rrc_lite_data_req(u8 eNB_id, u8 UE_id, u32 frame, u8 eNB_flag, unsigned int r // Uses a new buffer to avoid issue with PDCP buffer content that could be changed by PDCP (asynchronous message handling). u8 *message_buffer; - message_buffer = itti_malloc (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, sdu_size); + message_buffer = itti_malloc (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, sdu_size); memcpy (message_buffer, Buffer, sdu_size); message_p = itti_alloc_new_message (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, RRC_DCCH_DATA_REQ); @@ -531,7 +531,7 @@ void rrc_lite_data_ind(u8_t eNB_id, u8_t UE_id, u32 frame, u8 eNB_flag,u32 Srb_i // Uses a new buffer to avoid issue with PDCP buffer content that could be changed by PDCP (asynchronous message handling). u8 *message_buffer; - message_buffer = itti_malloc (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, sdu_size); + message_buffer = itti_malloc (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, sdu_size); memcpy (message_buffer, Buffer, sdu_size); message_p = itti_alloc_new_message (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, RRC_DCCH_DATA_IND); @@ -542,7 +542,7 @@ void rrc_lite_data_ind(u8_t eNB_id, u8_t UE_id, u32 frame, u8 eNB_flag,u32 Srb_i RRC_DCCH_DATA_IND (message_p).ue_index = UE_id; RRC_DCCH_DATA_IND (message_p).eNB_index = eNB_id; - itti_send_msg_to_task ((eNB_flag == 1) ? TASK_RRC_ENB : TASK_RRC_UE, Mod_id, message_p); + itti_send_msg_to_task (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, Mod_id, message_p); } #else if (eNB_flag ==1) { -- GitLab