From f3c8ea84c09f2027abde6aca2de74e9807295875 Mon Sep 17 00:00:00 2001 From: Lionel Gauthier <lionel.gauthier@eurecom.fr> Date: Tue, 4 Nov 2014 15:22:34 +0000 Subject: [PATCH] sync ENB source code (debug session on PDCP GTP UDP) git-svn-id: http://svn.eurecom.fr/openair4G/trunk@5965 818b1a75-f10b-46b9-bf7c-635c3b92a50f --- openair-cn/GTPV1-U/gtpv1u_eNB.c | 6 +- openair-cn/GTPV1-U/gtpv1u_task.c | 6 +- openair-cn/SGI/sgi_egress.c | 16 +- openair-cn/SGI/sgi_socket.c | 1 + openair-cn/UDP/udp_eNB_task.c | 3 +- openair-cn/UDP/udp_primitives_server.c | 269 +++++++++++++++---------- 6 files changed, 188 insertions(+), 113 deletions(-) diff --git a/openair-cn/GTPV1-U/gtpv1u_eNB.c b/openair-cn/GTPV1-U/gtpv1u_eNB.c index dc42d0738d..d0b735b613 100644 --- a/openair-cn/GTPV1-U/gtpv1u_eNB.c +++ b/openair-cn/GTPV1-U/gtpv1u_eNB.c @@ -858,7 +858,8 @@ void *gtpv1u_eNB_task(void *args) udp_data_ind_p->buffer_length, udp_data_ind_p->peer_port, udp_data_ind_p->peer_address); - free(udp_data_ind_p->buffer); + itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_ind_p->buffer); + udp_data_ind_p->buffer = NULL; } break; @@ -914,7 +915,8 @@ void *gtpv1u_eNB_task(void *args) } } /* Buffer is no longer needed, free it */ - free(data_req_p->buffer); + itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), data_req_p->buffer); + data_req_p->buffer = NULL; } break; diff --git a/openair-cn/GTPV1-U/gtpv1u_task.c b/openair-cn/GTPV1-U/gtpv1u_task.c index e4b8fba0ec..c668aca153 100644 --- a/openair-cn/GTPV1-U/gtpv1u_task.c +++ b/openair-cn/GTPV1-U/gtpv1u_task.c @@ -387,7 +387,8 @@ static void *gtpv1u_thread(void *args) udp_data_ind_p->buffer_length, udp_data_ind_p->peer_port, udp_data_ind_p->peer_address); - free(udp_data_ind_p->buffer); + itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_ind_p->buffer); + udp_data_ind_p->buffer = NULL; } break; @@ -453,7 +454,8 @@ static void *gtpv1u_thread(void *args) } } /* Buffer is no longer needed, free it */ - free(data_req_p->buffer); + itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), data_req_p->buffer); + data_req_p->buffer = NULL; } break; case TERMINATE_MESSAGE: { diff --git a/openair-cn/SGI/sgi_egress.c b/openair-cn/SGI/sgi_egress.c index fe9e15696d..dd719eb945 100755 --- a/openair-cn/SGI/sgi_egress.c +++ b/openair-cn/SGI/sgi_egress.c @@ -40,6 +40,7 @@ #include "sgi.h" #include "intertask_interface.h" +#include "assertions.h" #include <netinet/ip6.h> #include <netinet/ip.h> @@ -187,7 +188,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); return; } - message_payload_p = malloc(packet_sizeP); + message_payload_p = itti_malloc(TASK_FW_IP, TASK_GTPV1_U, packet_sizeP); if (message_payload_p == NULL) { SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); return; @@ -201,7 +202,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid; gtpv1u_tunnel_data_req_p->length = packet_sizeP; gtpv1u_tunnel_data_req_p->buffer = message_payload_p; - SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP); + SGI_IF_DEBUG("%s ETHER send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP); itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p); @@ -319,7 +320,8 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); return; } - message_payload_p = malloc(packet_sizeP - sizeof(sgi_data_pP->eh)); + AssertFatal((packet_sizeP - sizeof(sgi_data_pP->eh)) > 20, "BAD IP PACKET SIZE"); + message_payload_p = itti_malloc(TASK_FW_IP, TASK_GTPV1_U, packet_sizeP - sizeof(sgi_data_pP->eh)); if (message_payload_p == NULL) { SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); return; @@ -331,9 +333,13 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int //#warning forced S1u_enb_teid to 1 for testing, waiting for MODIFY_BEARER REQUEST // gtpv1u_tunnel_data_req_p->S1u_enb_teid = 1; gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid; - gtpv1u_tunnel_data_req_p->length = packet_sizeP; + gtpv1u_tunnel_data_req_p->length = packet_sizeP - sizeof(sgi_data_pP->eh); gtpv1u_tunnel_data_req_p->buffer = message_payload_p; - SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP); + SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", + __FUNCTION__, + gtpv1u_tunnel_data_req_p->S1u_enb_teid, + gtpv1u_tunnel_data_req_p->local_S1u_teid, + gtpv1u_tunnel_data_req_p->length); itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p); diff --git a/openair-cn/SGI/sgi_socket.c b/openair-cn/SGI/sgi_socket.c index 0528469af4..dfcad2a646 100644 --- a/openair-cn/SGI/sgi_socket.c +++ b/openair-cn/SGI/sgi_socket.c @@ -835,6 +835,7 @@ void* sgi_sock_raw_fw_2_gtpv1u_thread(void* args_p) while (1) { num_bytes = recvfrom(sgi_data_p->sd[socket_index], &sgi_data_p->recv_buffer[0][socket_index], SGI_BUFFER_RECV_LEN, 0, NULL, NULL); if (num_bytes > 0) { + SGI_IF_DEBUG("recvfrom bearer id %d %d bytes\n", socket_index + SGI_MIN_EPS_BEARER_ID, num_bytes); sgi_process_raw_packet(sgi_data_p, &sgi_data_p->recv_buffer[0][socket_index], num_bytes); } else { SGI_IF_DEBUG("recvfrom bearer id %d %d (%s:%d)\n", socket_index + SGI_MIN_EPS_BEARER_ID, num_bytes, strerror(errno), errno); diff --git a/openair-cn/UDP/udp_eNB_task.c b/openair-cn/UDP/udp_eNB_task.c index 99ccc9ae3d..0bef5cdf7e 100644 --- a/openair-cn/UDP/udp_eNB_task.c +++ b/openair-cn/UDP/udp_eNB_task.c @@ -247,7 +247,8 @@ void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP) LOG_W(UDP_, "Recvfrom returned 0\n"); return; } else{ - forwarded_buffer = calloc(n, sizeof(uint8_t)); + forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, n*sizeof(uint8_t)); + DevAssert(forwarded_buffer != NULL); memcpy(forwarded_buffer, l_buffer, n); message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND); DevAssert(message_p != NULL); diff --git a/openair-cn/UDP/udp_primitives_server.c b/openair-cn/UDP/udp_primitives_server.c index 4570298484..1088947e6a 100644 --- a/openair-cn/UDP/udp_primitives_server.c +++ b/openair-cn/UDP/udp_primitives_server.c @@ -42,6 +42,8 @@ #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> +#include <fcntl.h> + #include <pthread.h> @@ -54,9 +56,10 @@ #define UDP_DEBUG(x, args...) do { fprintf(stdout, "[UDP] [D]"x, ##args); } while(0) #define UDP_ERROR(x, args...) do { fprintf(stderr, "[UDP] [E]"x, ##args); } while(0) -void *udp_receiver_thread(void *args_p); +//void *udp_receiver_thread(void *args_p); struct udp_socket_desc_s { + uint8_t buffer[4096]; int sd; /* Socket descriptor to use */ pthread_t listener_thread; /* Thread affected to recv */ @@ -72,10 +75,13 @@ struct udp_socket_desc_s { static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list; static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER; +static void udp_server_receive_and_process(struct udp_socket_desc_s *udp_sock_pP); + + /* @brief Retrieve the descriptor associated with the task_id */ static -struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id) +struct udp_socket_desc_s *udp_server_get_socket_desc(task_id_t task_id) { struct udp_socket_desc_s *udp_sock_p = NULL; @@ -89,14 +95,29 @@ struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id) } return udp_sock_p; } +static +struct udp_socket_desc_s *udp_server_get_socket_desc_by_sd(int sdP) +{ + struct udp_socket_desc_s *udp_sock_p = NULL; + + UDP_DEBUG("Looking for sd %d\n", sdP); + + STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) { + if (udp_sock_p->sd == sdP) { + UDP_DEBUG("Found matching task desc\n"); + break; + } + } + return udp_sock_p; +} static -int udp_create_socket(int port, char *address, task_id_t task_id) +int udp_server_create_socket(int port, char *address, task_id_t task_id) { struct sockaddr_in addr; int sd; - struct udp_socket_desc_s *thread_arg = NULL; + struct udp_socket_desc_s *socket_desc_p = NULL; UDP_DEBUG("Creating new listen socket on address "IPV4_ADDR" and port %u\n", IPV4_ADDR_FORMAT(inet_addr(address)), port); @@ -120,54 +141,84 @@ int udp_create_socket(int port, char *address, task_id_t task_id) return -1; } - thread_arg = calloc(1, sizeof(struct udp_socket_desc_s)); + /* Add the socket to list of fd monitored by ITTI */ + /* Mark the socket as non-blocking */ + if (fcntl(sd, F_SETFL, O_NONBLOCK) < 0) { + UDP_ERROR("fcntl F_SETFL O_NONBLOCK failed: %s\n", + strerror(errno)); + close(sd); + return -1; + } - DevAssert(thread_arg != NULL); + socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s)); + DevAssert(socket_desc_p != NULL); + socket_desc_p->sd = sd; + socket_desc_p->local_address = address; + socket_desc_p->local_port = port; + socket_desc_p->task_id = task_id; + UDP_DEBUG("Inserting new descriptor for task %d, sd %d\n", + socket_desc_p->task_id, socket_desc_p->sd); + pthread_mutex_lock(&udp_socket_list_mutex); + STAILQ_INSERT_TAIL(&udp_socket_list, socket_desc_p, entries); + pthread_mutex_unlock(&udp_socket_list_mutex); - thread_arg->sd = sd; - thread_arg->local_address = address; - thread_arg->local_port = port; - thread_arg->task_id = task_id; + itti_subscribe_event_fd(TASK_UDP, sd); - if (pthread_create(&thread_arg->listener_thread, NULL, - &udp_receiver_thread, (void *)thread_arg) < 0) { - UDP_ERROR("Pthred_create failed (%s)\n", strerror(errno)); - return -1; - } return sd; } -void *udp_receiver_thread(void *arg_p) +static void udp_server_flush_sockets(struct epoll_event *events, int nb_events) { - uint8_t buffer[2048]; + int event; + struct udp_socket_desc_s *udp_sock_p = NULL; + + UDP_DEBUG("Received %d events\n", nb_events); - struct udp_socket_desc_s *udp_sock_p = (struct udp_socket_desc_s *)arg_p; + for (event = 0; event < nb_events; event++) { + if (events[event].events != 0) { + /* If the event has not been yet been processed (not an itti message) */ + pthread_mutex_lock(&udp_socket_list_mutex); + udp_sock_p = udp_server_get_socket_desc_by_sd(events[event].data.fd); + if (udp_sock_p != NULL) { + udp_server_receive_and_process(udp_sock_p); + } else { + UDP_ERROR("Failed to retrieve the udp socket descriptor %d", + events[event].data.fd); + } + pthread_mutex_unlock(&udp_socket_list_mutex); + } + } +} + +static void udp_server_receive_and_process(struct udp_socket_desc_s *udp_sock_pP) +{ UDP_DEBUG("Inserting new descriptor for task %d, sd %d\n", - udp_sock_p->task_id, udp_sock_p->sd); - pthread_mutex_lock(&udp_socket_list_mutex); - STAILQ_INSERT_TAIL(&udp_socket_list, udp_sock_p, entries); - pthread_mutex_unlock(&udp_socket_list_mutex); + udp_sock_pP->task_id, udp_sock_pP->sd); - while (1) { - int n; + { + int bytes_received = 0; socklen_t from_len; struct sockaddr_in addr; from_len = (socklen_t)sizeof(struct sockaddr_in); - if ((n = recvfrom(udp_sock_p->sd, buffer, sizeof(buffer), 0, - (struct sockaddr *)&addr, &from_len)) < 0) { + if ((bytes_received = recvfrom(udp_sock_pP->sd, udp_sock_pP->buffer, sizeof(udp_sock_pP->buffer), 0, + (struct sockaddr *)&addr, &from_len)) <= 0) { UDP_ERROR("Recvfrom failed %s\n", strerror(errno)); - break; + //break; } else { MessageDef *message_p = NULL; udp_data_ind_t *udp_data_ind_p; uint8_t *forwarded_buffer = NULL; - forwarded_buffer = calloc(n, sizeof(uint8_t)); + AssertFatal(sizeof(udp_sock_pP->buffer) >= bytes_received, "UDP BUFFER OVERFLOW"); + + forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, bytes_received); - memcpy(forwarded_buffer, buffer, n); + DevAssert(forwarded_buffer != NULL); + + memcpy(forwarded_buffer, udp_sock_pP->buffer, bytes_received); message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND); @@ -176,110 +227,122 @@ void *udp_receiver_thread(void *arg_p) udp_data_ind_p = &message_p->ittiMsg.udp_data_ind; udp_data_ind_p->buffer = forwarded_buffer; - udp_data_ind_p->buffer_length = n; + udp_data_ind_p->buffer_length = bytes_received; udp_data_ind_p->peer_port = htons(addr.sin_port); udp_data_ind_p->peer_address = addr.sin_addr.s_addr; UDP_DEBUG("Msg of length %d received from %s:%u\n", - n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); - if (itti_send_msg_to_task(udp_sock_p->task_id, INSTANCE_DEFAULT, message_p) < 0) { + bytes_received, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); + if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) { UDP_DEBUG("Failed to send message %d to task %d\n", - UDP_DATA_IND, udp_sock_p->task_id); - break; + UDP_DATA_IND, udp_sock_pP->task_id); + //break; } } } - close(udp_sock_p->sd); - udp_sock_p->sd = -1; + //close(udp_sock_pP->sd); + //udp_sock_pP->sd = -1; - pthread_mutex_lock(&udp_socket_list_mutex); - STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries); - pthread_mutex_unlock(&udp_socket_list_mutex); + //pthread_mutex_lock(&udp_socket_list_mutex); + //STAILQ_REMOVE(&udp_socket_list, udp_sock_pP, udp_socket_desc_s, entries); + //pthread_mutex_unlock(&udp_socket_list_mutex); - return NULL; + //return NULL; } + static void *udp_intertask_interface(void *args_p) { + int rc = 0; + int nb_events = 0; + struct epoll_event *events = NULL; + itti_mark_task_ready(TASK_UDP); while(1) { MessageDef *received_message_p = NULL; itti_receive_msg(TASK_UDP, &received_message_p); - DevAssert(received_message_p != NULL); - - switch (ITTI_MSG_ID(received_message_p)) - { - case UDP_INIT: { - udp_init_t *udp_init_p; - udp_init_p = &received_message_p->ittiMsg.udp_init; - udp_create_socket( - udp_init_p->port, - udp_init_p->address, - ITTI_MSG_ORIGIN_ID(received_message_p)); - } break; - case UDP_DATA_REQ: { - int udp_sd = -1; - ssize_t bytes_written; - - struct udp_socket_desc_s *udp_sock_p = NULL; - udp_data_req_t *udp_data_req_p; - struct sockaddr_in peer_addr; - - udp_data_req_p = &received_message_p->ittiMsg.udp_data_req; - - memset(&peer_addr, 0, sizeof(struct sockaddr_in)); - - peer_addr.sin_family = AF_INET; - peer_addr.sin_port = htons(udp_data_req_p->peer_port); - peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address; - - pthread_mutex_lock(&udp_socket_list_mutex); - udp_sock_p = udp_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p)); - - if (udp_sock_p == NULL) { - UDP_ERROR("Failed to retrieve the udp socket descriptor " - "associated with task %d\n", ITTI_MSG_ORIGIN_ID(received_message_p)); - pthread_mutex_unlock(&udp_socket_list_mutex); - if (udp_data_req_p->buffer) { - free(udp_data_req_p->buffer); + if (received_message_p != NULL) { + switch (ITTI_MSG_ID(received_message_p)) + { + case UDP_INIT: { + udp_init_t *udp_init_p; + udp_init_p = &received_message_p->ittiMsg.udp_init; + rc = udp_server_create_socket( + udp_init_p->port, + udp_init_p->address, + ITTI_MSG_ORIGIN_ID(received_message_p)); + } break; + + case UDP_DATA_REQ: { + int udp_sd = -1; + ssize_t bytes_written; + + struct udp_socket_desc_s *udp_sock_p = NULL; + udp_data_req_t *udp_data_req_p; + struct sockaddr_in peer_addr; + + udp_data_req_p = &received_message_p->ittiMsg.udp_data_req; + + memset(&peer_addr, 0, sizeof(struct sockaddr_in)); + + peer_addr.sin_family = AF_INET; + peer_addr.sin_port = htons(udp_data_req_p->peer_port); + peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address; + + pthread_mutex_lock(&udp_socket_list_mutex); + udp_sock_p = udp_server_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p)); + + if (udp_sock_p == NULL) { + UDP_ERROR("Failed to retrieve the udp socket descriptor " + "associated with task %d\n", ITTI_MSG_ORIGIN_ID(received_message_p)); + pthread_mutex_unlock(&udp_socket_list_mutex); + if (udp_data_req_p->buffer) { + free(udp_data_req_p->buffer); + } + goto on_error; } - goto on_error; - } - udp_sd = udp_sock_p->sd; - pthread_mutex_unlock(&udp_socket_list_mutex); + udp_sd = udp_sock_p->sd; + pthread_mutex_unlock(&udp_socket_list_mutex); - UDP_DEBUG("[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n", - udp_sd, udp_data_req_p->buffer_length, - IPV4_ADDR_FORMAT(udp_data_req_p->peer_address), - udp_data_req_p->peer_port); + UDP_DEBUG("[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n", + udp_sd, udp_data_req_p->buffer_length, + IPV4_ADDR_FORMAT(udp_data_req_p->peer_address), + udp_data_req_p->peer_port); - bytes_written = sendto(udp_sd, udp_data_req_p->buffer, + bytes_written = sendto(udp_sd, udp_data_req_p->buffer, udp_data_req_p->buffer_length, 0, (struct sockaddr *)&peer_addr, sizeof(struct sockaddr_in)); - if (bytes_written != udp_data_req_p->buffer_length) { - UDP_ERROR("There was an error while writing to socket " - "(%d:%s)\n", errno, strerror(errno)); - } - } break; + if (bytes_written != udp_data_req_p->buffer_length) { + UDP_ERROR("There was an error while writing to socket " + "(%d:%s)\n", errno, strerror(errno)); + } + } break; - case TERMINATE_MESSAGE: { - itti_exit_task(); - } break; + case TERMINATE_MESSAGE: { + itti_exit_task(); + } break; - case MESSAGE_TEST: { - } break; + case MESSAGE_TEST: { + } break; - default: { - UDP_DEBUG("Unkwnon message ID %d:%s\n", - ITTI_MSG_ID(received_message_p), ITTI_MSG_NAME(received_message_p)); - } break; - } + default: { + UDP_DEBUG("Unkwnon message ID %d:%s\n", + ITTI_MSG_ID(received_message_p), ITTI_MSG_NAME(received_message_p)); + } break; + } on_error: - itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p); - received_message_p = NULL; + rc = itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p); + AssertFatal(rc == EXIT_SUCCESS, "Failed to free memory (%d)!\n", rc); + received_message_p = NULL; + } + nb_events = itti_get_events(TASK_UDP, &events); + if ((nb_events > 0) && (events != NULL)) { + /* Now handle notifications for other sockets */ + udp_server_flush_sockets(events, nb_events); + } } return NULL; } -- GitLab