Newer
Older
/*******************************************************************************
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France.
*******************************************************************************/
/*! \file udp_eNB_task.c
* \brief
* \author Sebastien ROUX, Lionel Gauthier
* \company Eurecom
* \email: lionel.gauthier@eurecom.fr
*/
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "queue.h"
#include "intertask_interface.h"
#include "assertions.h"
#include "udp_eNB_task.h"
#include "UTIL/LOG/log.h"
#define IPV4_ADDR "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS) \
(uint8_t)((aDDRESS) & 0x000000ff), \
(uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ), \
(uint8_t)(((aDDRESS) & 0x00ff0000) >> 16), \
(uint8_t)(((aDDRESS) & 0xff000000) >> 24)
struct udp_socket_desc_s {
int sd; /* Socket descriptor to use */
pthread_t listener_thread; /* Thread affected to recv */
char *local_address; /* Local ipv4 address to use */
uint16_t local_port; /* Local port to use */
task_id_t task_id; /* Task who has requested the new endpoint */
STAILQ_ENTRY(udp_socket_desc_s) entries;
};
static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static
struct udp_socket_desc_s *
udp_eNB_get_socket_desc(task_id_t task_id);
void udp_eNB_process_file_descriptors(
struct epoll_event *events,
int nb_events);
static
int
udp_eNB_create_socket(
int port,
char *ip_addr,
task_id_t task_id);
int
udp_eNB_send_to(
int sd,
uint16_t port,
uint32_t address,
const uint8_t *buffer,
uint32_t length);
void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP);
void *udp_eNB_task(void *args_p);
int udp_enb_init(const Enb_properties_t *enb_config_p);
/* @brief Retrieve the descriptor associated with the task_id
*/
static
struct udp_socket_desc_s *udp_eNB_get_socket_desc(task_id_t task_id)
{
struct udp_socket_desc_s *udp_sock_p = NULL;
LOG_I(UDP_, "Looking for task %d\n", task_id);
STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
if (udp_sock_p->task_id == task_id) {
void udp_eNB_process_file_descriptors(struct epoll_event *events, int nb_events)
{
int i;
struct udp_socket_desc_s *udp_sock_p = NULL;
if (events == NULL) {
return;
}
for (i = 0; i < nb_events; i++) {
STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
if (udp_sock_p->sd == events[i].data.fd) {
LOG_D(UDP_, "Found matching task desc\n");
udp_eNB_receiver(udp_sock_p);
break;
}
}
}
}
int udp_eNB_create_socket(int port, char *ip_addr, task_id_t task_id)
struct udp_socket_desc_s *udp_socket_desc_p = NULL;
int sd, rc;
struct sockaddr_in sin;
LOG_I(UDP_, "Initializing UDP for local address %s with port %d\n", ip_addr, port);
sd = socket(AF_INET, SOCK_DGRAM, 0);
AssertFatal(sd > 0, "UDP: Failed to create new socket: (%s:%d)\n", strerror(errno), errno);
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
if (ip_addr == NULL) {
sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
} else {
sin.sin_addr.s_addr = inet_addr(ip_addr);
}
if ((rc = bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in))) < 0) {
close(sd);
AssertFatal(rc >= 0, "UDP: Failed to bind socket: (%s:%d)\n\n", strerror(errno), errno);
}
/* Create a new descriptor for this connection */
udp_socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s));
DevAssert(udp_socket_desc_p != NULL);
udp_socket_desc_p->sd = sd;
udp_socket_desc_p->local_address = ip_addr;
udp_socket_desc_p->local_port = port;
udp_socket_desc_p->task_id = task_id;
LOG_I(UDP_, "Inserting new descriptor for task %d, sd %d\n", udp_socket_desc_p->task_id, udp_socket_desc_p->sd);
pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_INSERT_TAIL(&udp_socket_list, udp_socket_desc_p, entries);
pthread_mutex_unlock(&udp_socket_list_mutex);
itti_subscribe_event_fd(TASK_UDP, sd);
LOG_I(UDP_, "Initializing UDP for local address %s with port %d: DONE\n", ip_addr, port);
return sd;
}
int
udp_eNB_send_to(
int sd,
uint16_t port,
uint32_t address,
const uint8_t *buffer,
uint32_t length)
struct sockaddr_in to;
socklen_t to_length;
if (sd <= 0 || ((buffer == NULL) && (length > 0))) {
LOG_E(UDP_, "udp_send_to: bad param\n");
return -1;
}
memset(&to, 0, sizeof(struct sockaddr_in));
to_length = sizeof(to);
to.sin_family = AF_INET;
to.sin_port = htons(port);
to.sin_addr.s_addr = address;
if (sendto(sd, (void *)buffer, (size_t)length, 0, (struct sockaddr *)&to,
to_length) < 0) {
LOG_E(UDP_,
"[SD %d] Failed to send data to "IPV4_ADDR" on port %d, buffer size %u\n",
sd, IPV4_ADDR_FORMAT(address), port, length);
return -1;
}
LOG_I(UDP_, "[SD %d] Successfully sent to "IPV4_ADDR
" on port %d, buffer size %u, buffer address %x\n",
sd, IPV4_ADDR_FORMAT(address), port, length, buffer);
return 0;
}
void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP)
{
uint8_t l_buffer[2048];
int n;
socklen_t from_len;
struct sockaddr_in addr;
MessageDef *message_p = NULL;
udp_data_ind_t *udp_data_ind_p = NULL;
uint8_t *forwarded_buffer = NULL;
if ((n = recvfrom(udp_sock_pP->sd, l_buffer, sizeof(l_buffer), 0,
(struct sockaddr *)&addr, &from_len)) < 0) {
LOG_E(UDP_, "Recvfrom failed %s\n", strerror(errno));
forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, n*sizeof(uint8_t));
DevAssert(forwarded_buffer != NULL);
memcpy(forwarded_buffer, l_buffer, n);
message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
DevAssert(message_p != NULL);
udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
udp_data_ind_p->buffer = forwarded_buffer;
udp_data_ind_p->buffer_length = n;
udp_data_ind_p->peer_port = htons(addr.sin_port);
udp_data_ind_p->peer_address = addr.sin_addr.s_addr;
LOG_I(UDP_, "Msg of length %d received from %s:%u\n",
if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) {
LOG_I(UDP_, "Failed to send message %d to task %d\n",
UDP_DATA_IND,
udp_sock_pP->task_id);
//close(udp_sock_p->sd);
//udp_sock_p->sd = -1;
//pthread_mutex_lock(&udp_socket_list_mutex);
//STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries);
//pthread_mutex_unlock(&udp_socket_list_mutex);
int nb_events;
struct epoll_event *events;
MessageDef *received_message_p = NULL;
const char *msg_name = NULL;
instance_t instance = 0;
udp_enb_init(NULL);
itti_mark_task_ready(TASK_UDP);
while(1) {
itti_receive_msg(TASK_UDP, &received_message_p);
LOG_D(UDP_, "Got message %p\n", &received_message_p);
if (received_message_p != NULL) {
msg_name = ITTI_MSG_NAME (received_message_p);
instance = ITTI_MSG_INSTANCE (received_message_p);
switch (ITTI_MSG_ID(received_message_p))
{
case UDP_INIT: {
udp_init_t *udp_init_p;
udp_init_p = &received_message_p->ittiMsg.udp_init;
udp_eNB_create_socket(
udp_init_p->port,
udp_init_p->address,
ITTI_MSG_ORIGIN_ID(received_message_p));
struct udp_socket_desc_s *udp_sock_p = NULL;
udp_data_req_t *udp_data_req_p;
struct sockaddr_in peer_addr;
udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;
memset(&peer_addr, 0, sizeof(struct sockaddr_in));
peer_addr.sin_family = AF_INET;
peer_addr.sin_port = htons(udp_data_req_p->peer_port);
peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address;
udp_sock_p = udp_eNB_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));
LOG_E(UDP_,
"Failed to retrieve the udp socket descriptor "
"associated with task %d\n",
ITTI_MSG_ORIGIN_ID(received_message_p));
pthread_mutex_unlock(&udp_socket_list_mutex);
if (udp_data_req_p->buffer) {
udp_sd = udp_sock_p->sd;
pthread_mutex_unlock(&udp_socket_list_mutex);
LOG_D(UDP_, "[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
udp_sd,
udp_data_req_p->buffer_length,
IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
udp_data_req_p->peer_port);
udp_data_req_p->buffer_length,
0,
(struct sockaddr *)&peer_addr,
sizeof(struct sockaddr_in));
itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_req_p->buffer);
if (bytes_written != udp_data_req_p->buffer_length) {
LOG_E(UDP_, "There was an error while writing to socket "
"(%d:%s)\n", errno, strerror(errno));
}
} break;
case TERMINATE_MESSAGE: {
itti_exit_task();
} break;
default: {
LOG_W(UDP_, "Unkwnon message ID %d:%s\n",
ITTI_MSG_ID(received_message_p),
ITTI_MSG_NAME(received_message_p));
itti_free (ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p);
}
nb_events = itti_get_events(TASK_UDP, &events);
/* Now handle notifications for other sockets */
if (nb_events > 0) {
LOG_D(UDP_, "UDP task Process %d events\n",nb_events);
udp_eNB_process_file_descriptors(events, nb_events);
}
return NULL;
}
int udp_enb_init(const Enb_properties_t *enb_config_p)
{
LOG_I(UDP_, "Initializing UDP task interface\n");
LOG_I(UDP_, "Initializing UDP task interface: DONE\n");