diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index d46adcc42142cc2a95935405f2318addd73a58d2..07a60d5968678850cb019f1b9056a8296a8528b6 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -601,6 +601,9 @@ void itti_mark_task_ready(task_id_t task_id) { DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + /* Register the thread in itti dump */ + itti_dump_thread_use_ring_buffer(); + #if !defined(ENABLE_EVENT_FD) // Lock the mutex to get exclusive access to the list pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); @@ -638,6 +641,9 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max); #if !defined(RTAI) + /* SR: disable signals module for RTAI (need to harmonize management + * between softmodem and oaisim). + */ CHECK_INIT_RETURN(signal_init()); #endif diff --git a/common/utils/itti/intertask_interface_dump.c b/common/utils/itti/intertask_interface_dump.c index f725c418b55a04fbaa5d917d372fe7dd4d44ff04..763b1e0a84c6e47a873159cdf9cb3600cd763315 100644 --- a/common/utils/itti/intertask_interface_dump.c +++ b/common/utils/itti/intertask_interface_dump.c @@ -42,6 +42,7 @@ #include <fcntl.h> #include <errno.h> #include <error.h> +#include <sched.h> #include <sys/ioctl.h> #include <sys/socket.h> @@ -49,61 +50,65 @@ #include <sys/types.h> #include <arpa/inet.h> +#include <sys/eventfd.h> + #include "assertions.h" -#include "queue.h" +#include "liblfds611.h" #include "intertask_interface.h" #include "intertask_interface_dump.h" #define SIGNAL_NAME_LENGTH 48 -/* Declared in intertask_interface.c */ -extern int itti_debug; +static const int itti_dump_debug = 0; -#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ +#define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ while(0) -#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \ +#define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \ while(0) /* Message sent is an intertask dump type */ #define ITTI_DUMP_MESSAGE_TYPE 0x1 #define ITTI_STATISTIC_MESSAGE_TYPE 0x2 #define ITTI_DUMP_XML_DEFINITION 0x3 +/* This signal is not meant to be used by remote analyzer */ +#define ITTI_DUMP_EXIT_SIGNAL 0x4 -typedef struct itti_queue_item_s { - STAILQ_ENTRY(itti_queue_item_s) entry; +typedef struct itti_dump_queue_item_s { void *data; uint32_t data_size; uint32_t message_number; char message_name[SIGNAL_NAME_LENGTH]; -} itti_queue_item_t; + uint32_t message_type; + uint32_t message_size; +} itti_dump_queue_item_t; typedef struct { int sd; uint32_t last_message_number; - - pthread_mutex_t client_lock; } itti_client_desc_t; typedef struct itti_desc_s { - /* The acceptor thread */ - pthread_t itti_acceptor_thread; - pthread_t itti_write_thread; - - /* Protect the circular queue */ - pthread_mutex_t queue_mutex; + /* Asynchronous thread that write to file/accept new clients */ + pthread_t itti_acceptor_thread; + pthread_attr_t attr; /* List of messages to dump. * NOTE: we limit the size of this queue to retain only the last exchanged - * messages. The size can be increased by setting up the ITTI_QUEUE_SIZE_MAX + * messages. The size can be increased by setting up the ITTI_QUEUE_MAX_ELEMENTS * in mme_default_values.h or by putting a custom in the configuration file. */ - STAILQ_HEAD(itti_queue_s, itti_queue_item_s) itti_message_queue; - struct itti_queue_item_s *itti_queue_last; + struct lfds611_ringbuffer_state *itti_message_queue; + uint32_t queue_size; int nb_connected; + /* Event fd used to notify new messages (semaphore) */ + int event_fd; + + int itti_listen_socket; + itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON]; } itti_desc_t; @@ -127,17 +132,17 @@ typedef struct { } itti_statistic_message_t; -static itti_desc_t itti_queue; +static itti_desc_t itti_dump_queue; static FILE *dump_file; -static int itti_dump_send_message(int sd, itti_queue_item_t *message); +static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message); static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length); static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, const uint32_t message_definition_xml_length); static -int itti_dump_send_message(int sd, itti_queue_item_t *message) +int itti_dump_send_message(int sd, itti_dump_queue_item_t *message) { itti_dump_message_t *new_message; ssize_t bytes_sent = 0, total_sent = 0; @@ -167,7 +172,7 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message) do { bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0); if (bytes_sent < 0) { - ITTI_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", + ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", sd, size, errno, strerror(errno)); free(new_message); return -1; @@ -179,6 +184,22 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message) return total_sent; } +static void itti_dump_fwrite_message(itti_dump_queue_item_t *message) +{ + itti_socket_header_t header; + + if (dump_file != NULL && message) { + + header.message_size = message->message_size + sizeof(itti_dump_message_t); + header.message_type = message->message_type; + + fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file); + fwrite (&message->message_number, sizeof(message->message_number), 1, dump_file); + fwrite (message->message_name, sizeof(message->message_name), 1, dump_file); + fwrite (message->data, message->data_size, 1, dump_file); + } +} + static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, const uint32_t message_definition_xml_length) { @@ -195,7 +216,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin itti_dump_message = calloc(1, itti_dump_message_size); - ITTI_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n", + ITTI_DUMP_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n", sd, itti_dump_message_size); itti_dump_message->message_size = itti_dump_message_size; @@ -209,7 +230,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin do { bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0); if (bytes_sent < 0) { - ITTI_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", + ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", sd, itti_dump_message_size, errno, strerror(errno)); free(itti_dump_message); return -1; @@ -222,53 +243,30 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin return 0; } -static int itti_enqueue_message(itti_queue_item_t *new, uint32_t message_size, +static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size, uint32_t message_type) { - itti_queue_item_t *head = NULL; - - DevAssert(new != NULL); + ssize_t write_ret; + uint64_t sem_counter = 1; - /* Lock the queue mutex for writing to insert the new element */ - pthread_mutex_lock(&itti_queue.queue_mutex); + struct lfds611_freelist_element *new_queue_element = NULL; - /* We reached the maximum size for the queue of messages -> remove the head */ - if (itti_queue.queue_size + message_size > ITTI_QUEUE_SIZE_MAX) { - head = STAILQ_FIRST(&itti_queue.itti_message_queue); - /* Remove the head */ - STAILQ_REMOVE_HEAD(&itti_queue.itti_message_queue, entry); - } else { - itti_queue.queue_size += message_size; - } - /* Insert the packet at tail */ - STAILQ_INSERT_TAIL(&itti_queue.itti_message_queue, new, entry); - itti_queue.itti_queue_last = new; + DevAssert(new != NULL); - if (dump_file != NULL) - { - itti_socket_header_t header; + new->message_type = message_type; + new->message_size = message_size; - header.message_size = message_size + sizeof(itti_dump_message_t); - header.message_type = message_type; + new_queue_element = lfds611_ringbuffer_get_write_element( + itti_dump_queue.itti_message_queue, &new_queue_element, NULL); - fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file); - fwrite (&new->message_number, sizeof(new->message_number), 1, dump_file); - fwrite (new->message_name, sizeof(new->message_name), 1, dump_file); - fwrite (new->data, new->data_size, 1, dump_file); - fflush (dump_file); - } + lfds611_freelist_set_user_data_in_element(new_queue_element, (void *)new); - /* Release the mutex */ - pthread_mutex_unlock(&itti_queue.queue_mutex); + lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, + new_queue_element); - /* No need to have the mutex locked to free data as at this point the message - * is no more in the list. - */ - if (head) { - free(head->data); - free(head); - head = NULL; - } + /* Call to write for an event fd must be of 8 bytes */ + write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); + DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0); return 0; } @@ -278,17 +276,17 @@ int itti_dump_queue_message(message_number_t message_number, const char *message_name, const uint32_t message_size) { - itti_queue_item_t *new; + itti_dump_queue_item_t *new; size_t message_name_length; int i; DevAssert(message_name != NULL); DevAssert(message_p != NULL); - new = calloc(1, sizeof(itti_queue_item_t)); + new = calloc(1, sizeof(itti_dump_queue_item_t)); if (new == NULL) { - ITTI_ERROR("Failed to allocate memory (%s:%d)\n", + ITTI_DUMP_ERROR("Failed to allocate memory (%s:%d)\n", __FILE__, __LINE__); return -1; } @@ -296,7 +294,7 @@ int itti_dump_queue_message(message_number_t message_number, new->data = malloc(message_size); if (new->data == NULL) { - ITTI_ERROR("Failed to allocate memory (%s:%d)\n", + ITTI_DUMP_ERROR("Failed to allocate memory (%s:%d)\n", __FILE__, __LINE__); return -1; } @@ -309,17 +307,12 @@ int itti_dump_queue_message(message_number_t message_number, SIGNAL_NAME_LENGTH, 0); memcpy(new->message_name, message_name, message_name_length); - itti_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE); + itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE); for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { - if (pthread_mutex_trylock(&itti_queue.itti_clients[i].client_lock) == 0) { - pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock); - } else { - continue; - } - if (itti_queue.itti_clients[i].sd == -1) + if (itti_dump_queue.itti_clients[i].sd == -1) continue; - itti_dump_send_message(itti_queue.itti_clients[i].sd, new); + itti_dump_send_message(itti_dump_queue.itti_clients[i].sd, new); } return 0; @@ -332,10 +325,10 @@ static void *itti_dump_socket(void *arg_p) int rc; int itti_listen_socket, max_sd; int on = 1; - fd_set master_set, working_set; + fd_set read_set, working_set; struct sockaddr_in servaddr; /* socket address structure */ - ITTI_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT); + ITTI_DUMP_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT); message_definition_xml = (char *)arg_p; DevAssert(message_definition_xml != NULL); @@ -343,7 +336,7 @@ static void *itti_dump_socket(void *arg_p) message_definition_xml_length = strlen(message_definition_xml) + 1; if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - ITTI_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } @@ -351,7 +344,7 @@ static void *itti_dump_socket(void *arg_p) rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); if (rc < 0) { - ITTI_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno)); close(itti_listen_socket); pthread_exit(NULL); } @@ -361,7 +354,7 @@ static void *itti_dump_socket(void *arg_p) */ rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on); if (rc < 0) { - ITTI_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno)); close(itti_listen_socket); pthread_exit(NULL); } @@ -373,17 +366,26 @@ static void *itti_dump_socket(void *arg_p) if (bind(itti_listen_socket, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) { - ITTI_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } if (listen(itti_listen_socket, 5) < 0) { - ITTI_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } - FD_ZERO(&master_set); - max_sd = itti_listen_socket; - FD_SET(itti_listen_socket, &master_set); + FD_ZERO(&read_set); + + /* Add the listener */ + FD_SET(itti_listen_socket, &read_set); + + /* Add the event fd */ + FD_SET(itti_dump_queue.event_fd, &read_set); + + /* Max of both sd */ + max_sd = itti_listen_socket > itti_dump_queue.event_fd ? itti_listen_socket : itti_dump_queue.event_fd; + + itti_dump_queue.itti_listen_socket = itti_listen_socket; /* Loop waiting for incoming connects or for incoming data * on any of the connected sockets. @@ -393,9 +395,7 @@ static void *itti_dump_socket(void *arg_p) int client_socket = -1; int i; - memcpy(&working_set, &master_set, sizeof(master_set)); - -// ITTI_DEBUG("Stuck on select\n"); + memcpy(&working_set, &read_set, sizeof(read_set)); /* No timeout: select blocks till a new event has to be handled * on sd's. @@ -403,37 +403,88 @@ static void *itti_dump_socket(void *arg_p) rc = select(max_sd + 1, &working_set, NULL, NULL, NULL); if (rc < 0) { - ITTI_ERROR("select failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("select failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } desc_ready = rc; - for (i = 0; i <= max_sd && desc_ready > 0; i++) { - if (FD_ISSET(i, &working_set)) { - ITTI_DEBUG("Handling socket %d\n", i); + for (i = 0; i <= max_sd && desc_ready > 0; i++) + { + if (FD_ISSET(i, &working_set)) + { desc_ready -= 1; - /* Check if the socket where data available is the listening - * socket. - */ - if (i == itti_listen_socket) { + + if (i == itti_dump_queue.event_fd) { + /* Notification of new element to dump from other tasks */ + uint64_t sem_counter; + ssize_t read_ret; + void *user_data; + int j; + + struct lfds611_freelist_element *element; + + /* Read will always return 1 */ + read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter)); + if (read_ret < 0) { + ITTI_DUMP_ERROR("Failed read for semaphore: %s\n", strerror(errno)); + pthread_exit(NULL); + } + DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0); + + /* Acquire the ring element */ + lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element); + + DevAssert(element != NULL); + + /* Retrieve user part of the message */ + lfds611_freelist_get_user_data_from_element(element, &user_data); + + if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL) + { + close(itti_dump_queue.event_fd); + close(itti_dump_queue.itti_listen_socket); + + lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); + + /* Leave the thread as we detected end signal */ + pthread_exit(NULL); + } + + /* Write message to file */ + itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data); + + /* Send message to remote analyzer */ + for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { + if (itti_dump_queue.itti_clients[i].sd > 0) { + itti_dump_send_message(itti_dump_queue.itti_clients[i].sd, + (itti_dump_queue_item_t *)user_data); + } + } + + /* We have finished with this element, reinsert it in the ring buffer */ + lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element); + + ITTI_DUMP_DEBUG("Write element to file\n"); + } else if (i == itti_listen_socket) { do { client_socket = accept(itti_listen_socket, NULL, NULL); if (client_socket < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { /* No more new connection */ - ITTI_DEBUG("No more new connection\n"); + ITTI_DUMP_DEBUG("No more new connection\n"); continue; } else { - ITTI_ERROR("accept failed (%d:%s)\n", errno, strerror(errno)); + ITTI_DUMP_ERROR("accept failed (%d:%s)\n", errno, strerror(errno)); pthread_exit(NULL); } } if (itti_dump_handle_new_connection(client_socket, message_definition_xml, - message_definition_xml_length) == 0) { + message_definition_xml_length) == 0) + { /* The socket has been accepted. * We have to update the set to include this new sd. */ - FD_SET(client_socket, &master_set); + FD_SET(client_socket, &read_set); if (client_socket > max_sd) max_sd = client_socket; } @@ -444,13 +495,13 @@ static void *itti_dump_socket(void *arg_p) */ uint8_t j; - ITTI_DEBUG("Socket %d disconnected\n", i); + ITTI_DUMP_DEBUG("Socket %d disconnected\n", i); /* Close the socket and update info related to this connection */ close(i); for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { - if (itti_queue.itti_clients[j].sd == i) + if (itti_dump_queue.itti_clients[j].sd == i) break; } @@ -462,19 +513,19 @@ static void *itti_dump_socket(void *arg_p) /* Re-initialize the socket to -1 so we can accept new * incoming connections. */ - itti_queue.itti_clients[j].sd = -1; - itti_queue.itti_clients[j].last_message_number = 0; - itti_queue.nb_connected--; + itti_dump_queue.itti_clients[j].sd = -1; + itti_dump_queue.itti_clients[j].last_message_number = 0; + itti_dump_queue.nb_connected--; /* Remove the socket from the FD set and update the max sd */ - FD_CLR(i, &master_set); + FD_CLR(i, &read_set); if (i == max_sd) { - if (itti_queue.nb_connected == 0) { + if (itti_dump_queue.nb_connected == 0) { /* No more new connection max_sd = itti_listen_socket */ max_sd = itti_listen_socket; } else { - while (FD_ISSET(max_sd, &master_set) == 0) { + while (FD_ISSET(max_sd, &read_set) == 0) { max_sd -= 1; } } @@ -489,44 +540,33 @@ static void *itti_dump_socket(void *arg_p) static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length) { - if (itti_queue.nb_connected < ITTI_DUMP_MAX_CON) { - itti_queue_item_t *item; + if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) { uint8_t i; for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { /* Let's find a place to store the new client */ - if (itti_queue.itti_clients[i].sd == -1) { + if (itti_dump_queue.itti_clients[i].sd == -1) { break; } } - ITTI_DEBUG("Found place to store new connection: %d\n", i); + ITTI_DUMP_DEBUG("Found place to store new connection: %d\n", i); DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd); - pthread_mutex_lock(&itti_queue.itti_clients[i].client_lock); - - itti_queue.itti_clients[i].sd = sd; - itti_queue.nb_connected++; - - ITTI_DEBUG("Socket %d accepted\n", sd); + ITTI_DUMP_DEBUG("Socket %d accepted\n", sd); /* Send the XML message definition */ if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) { - ITTI_ERROR("Failed to send XML definition\n"); + ITTI_DUMP_ERROR("Failed to send XML definition\n"); close (sd); return -1; } - /* At this point we have to dump the complete list */ - pthread_mutex_lock(&itti_queue.queue_mutex); - STAILQ_FOREACH(item, &itti_queue.itti_message_queue, entry) { - itti_dump_send_message(sd, item); - } - pthread_mutex_unlock(&itti_queue.queue_mutex); - pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock); + itti_dump_queue.itti_clients[i].sd = sd; + itti_dump_queue.nb_connected++; } else { - ITTI_DEBUG("Socket %d rejected\n", sd); + ITTI_DUMP_DEBUG("Socket %d rejected\n", sd); /* We have reached max number of users connected... * Reject the connection. */ @@ -537,9 +577,20 @@ int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t return 0; } +int itti_dump_user_data_init_function(void **user_data, void *user_state) +{ + return 0; +} + +/* This function should be called by each thread that will use the ring buffer */ +void itti_dump_thread_use_ring_buffer(void) +{ + lfds611_ringbuffer_use(itti_dump_queue.itti_message_queue); +} + int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name) { - int i; + int i, ret; if (dump_file_name != NULL) { @@ -547,10 +598,11 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons if (dump_file == NULL) { - ITTI_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno)); + ITTI_DUMP_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno)); } else { + /* Output the XML to file */ uint32_t message_size = strlen(messages_definition_xml) + 1; itti_socket_header_t header; @@ -563,34 +615,97 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons } } - memset(&itti_queue, 0, sizeof(itti_desc_t)); + memset(&itti_dump_queue, 0, sizeof(itti_desc_t)); - pthread_mutex_init(&itti_queue.queue_mutex, NULL); - STAILQ_INIT(&itti_queue.itti_message_queue); - itti_queue.queue_size = 0; - itti_queue.nb_connected = 0; + ITTI_DUMP_DEBUG("Creating new ring buffer for itti dump of %u elements\n", + ITTI_QUEUE_MAX_ELEMENTS); + + if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue, + ITTI_QUEUE_MAX_ELEMENTS, + NULL, + NULL) != 1) + { + ITTI_DUMP_ERROR("Failed to create ring buffer...\n"); + /* Always assert on this condition */ + DevAssert(0 == 1); + } + + itti_dump_queue.event_fd = eventfd(0, EFD_SEMAPHORE); + if (itti_dump_queue.event_fd == -1) + { + ITTI_DUMP_ERROR("eventfd failed: %s\n", strerror(errno)); + /* Always assert on this condition */ + DevAssert(0 == 1); + } + + itti_dump_queue.queue_size = 0; + itti_dump_queue.nb_connected = 0; for(i = 0; i < ITTI_DUMP_MAX_CON; i++) { - itti_queue.itti_clients[i].sd = -1; - itti_queue.itti_clients[i].last_message_number = 0; + itti_dump_queue.itti_clients[i].sd = -1; + itti_dump_queue.itti_clients[i].last_message_number = 0; + } + + /* initialized with default attributes */ + ret = pthread_attr_init(&itti_dump_queue.attr); + if (ret < 0) { + ITTI_DUMP_ERROR("pthread_attr_init failed (%d:%s)\n", errno, strerror(errno)); + return -1; + } - /* Init per user lock */ - pthread_mutex_init(&itti_queue.itti_clients[i].client_lock, NULL); + ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_RR); + if (ret < 0) { + ITTI_DUMP_ERROR("pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno)); + return -1; } - if (pthread_create(&itti_queue.itti_acceptor_thread, NULL, &itti_dump_socket, - (void *)messages_definition_xml) < 0) { - ITTI_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno)); + + ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr, + &itti_dump_socket, (void *)messages_definition_xml); + if (ret < 0) { + ITTI_DUMP_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno)); return -1; } + return 0; } +void itti_dump_user_data_delete_function(void *user_data, void *user_state) +{ + if (user_data != NULL) + { + itti_dump_queue_item_t *item; + + item = (itti_dump_queue_item_t *)user_data; + free(item->data); + free(item); + } +} + void itti_dump_exit(void) { + void *arg; + itti_dump_queue_item_t new; + + /* Send the exit signal to other thread */ + itti_dump_enqueue_message(&new, 0, ITTI_DUMP_EXIT_SIGNAL); + + ITTI_DUMP_DEBUG("waiting for dumper thread to finish\n"); + + /* wait for the thread to terminate */ + pthread_join(itti_dump_queue.itti_acceptor_thread, &arg); + + ITTI_DUMP_DEBUG("dumper thread correctly exited\n"); + if (dump_file != NULL) { + /* Synchronise file and then close it */ fclose(dump_file); dump_file = NULL; } -} + if (itti_dump_queue.itti_message_queue) + { + lfds611_ringbuffer_delete(itti_dump_queue.itti_message_queue, + itti_dump_user_data_delete_function, NULL); + } +} diff --git a/common/utils/itti/intertask_interface_dump.h b/common/utils/itti/intertask_interface_dump.h index 58bf5e205286850b00cac9dbadd4b90fbd60b3f1..9c070b86337b81feb7a8c4c3dbf1bd259909ad27 100644 --- a/common/utils/itti/intertask_interface_dump.h +++ b/common/utils/itti/intertask_interface_dump.h @@ -42,4 +42,6 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons void itti_dump_exit(void); +void itti_dump_thread_use_ring_buffer(void); + #endif /* INTERTASK_INTERFACE_DUMP_H_ */ diff --git a/openair-cn/COMMON/intertask_interface_conf.h b/openair-cn/COMMON/intertask_interface_conf.h index b58cc716a5b2d2d509d8cfaf53cb0682eceb8b73..dc25a7d63392df2b7d90d1e611be5b4547948383 100644 --- a/openair-cn/COMMON/intertask_interface_conf.h +++ b/openair-cn/COMMON/intertask_interface_conf.h @@ -45,7 +45,7 @@ #define ITTI_PORT (10007) /* This is the queue size for signal dumper */ -#define ITTI_QUEUE_SIZE_MAX (1 * 1024 * 1024) /* 1 MBytes */ +#define ITTI_QUEUE_MAX_ELEMENTS (200 * 1024) #define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */ #endif /* INTERTASK_INTERFACE_CONF_H_ */ diff --git a/openair2/COMMON/intertask_interface_conf.h b/openair2/COMMON/intertask_interface_conf.h index 230e8c90c96cbb0c221c07f505e12c1e89711a2f..df701a4d05ada3d604c807c16499e2c0ad0ada89 100644 --- a/openair2/COMMON/intertask_interface_conf.h +++ b/openair2/COMMON/intertask_interface_conf.h @@ -45,7 +45,7 @@ #define ITTI_PORT (10006) /* This is the queue size for signal dumper */ -#define ITTI_QUEUE_SIZE_MAX (1 * 1024 * 1024) /* 1 MBytes */ +#define ITTI_QUEUE_MAX_ELEMENTS (200 * 1000) #define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */ #endif /* INTERTASK_INTERFACE_CONF_H_ */