udp_eNB_task.c 12.8 KB
Newer Older
gauthier's avatar
Task  
gauthier committed
1
/*******************************************************************************
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
Eurecom OpenAirInterface core network
Copyright(c) 1999 - 2014 Eurecom

This program is free software; you can redistribute it and/or modify it
under the terms and conditions of the GNU General Public License,
version 2, as published by the Free Software Foundation.

This program is distributed in the hope it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.

The full GNU General Public License is included in this distribution in
the file called "COPYING".

Contact Information
Openair Admin: openair_admin@eurecom.fr
Openair Tech : openair_tech@eurecom.fr
Forums       : http://forums.eurecom.fsr/openairinterface
Address      : EURECOM,
               Campus SophiaTech,
               450 Route des Chappes,
               CS 50193
               06904 Biot Sophia Antipolis cedex,
               FRANCE
gauthier's avatar
Task  
gauthier committed
31
*******************************************************************************/
32 33 34 35 36 37
/*! \file udp_eNB_task.c
* \brief
* \author Sebastien ROUX, Lionel Gauthier
* \company Eurecom
* \email: lionel.gauthier@eurecom.fr
*/
gauthier's avatar
Task  
gauthier committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <pthread.h>

#include "queue.h"
#include "intertask_interface.h"
#include "assertions.h"
#include "udp_eNB_task.h"

#include "UTIL/LOG/log.h"

#define IPV4_ADDR    "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS)               \
    (uint8_t)((aDDRESS)  & 0x000000ff),         \
    (uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ),  \
    (uint8_t)(((aDDRESS) & 0x00ff0000) >> 16),  \
    (uint8_t)(((aDDRESS) & 0xff000000) >> 24)


struct udp_socket_desc_s {
    int       sd;              /* Socket descriptor to use */

    pthread_t listener_thread; /* Thread affected to recv */

    char     *local_address;   /* Local ipv4 address to use */
    uint16_t  local_port;      /* Local port to use */

    task_id_t task_id;         /* Task who has requested the new endpoint */

    STAILQ_ENTRY(udp_socket_desc_s) entries;
};

static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;


82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
static
struct udp_socket_desc_s *
udp_eNB_get_socket_desc(task_id_t task_id);

void udp_eNB_process_file_descriptors(
        struct epoll_event *events,
        int nb_events);

static
int
udp_eNB_create_socket(
        int port,
        char *ip_addr,
        task_id_t task_id);

int
udp_eNB_send_to(
    int sd,
    uint16_t port,
    uint32_t address,
    const uint8_t *buffer,
    uint32_t length);

void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP);

void *udp_eNB_task(void *args_p);

int udp_enb_init(const Enb_properties_t *enb_config_p);
gauthier's avatar
Task  
gauthier committed
110 111 112
/* @brief Retrieve the descriptor associated with the task_id
 */
static
113
struct udp_socket_desc_s *udp_eNB_get_socket_desc(task_id_t task_id)
gauthier's avatar
Task  
gauthier committed
114 115 116 117 118 119 120 121 122 123 124 125 126 127
{
    struct udp_socket_desc_s *udp_sock_p = NULL;

    LOG_I(UDP_, "Looking for task %d\n", task_id);

    STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
        if (udp_sock_p->task_id == task_id) {
            LOG_D(UDP_, "Found matching task desc\n");
            break;
        }
    }
    return udp_sock_p;
}

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
void udp_eNB_process_file_descriptors(struct epoll_event *events, int nb_events)
{
    int                       i;
    struct udp_socket_desc_s *udp_sock_p = NULL;

    if (events == NULL) {
        return;
    }

    for (i = 0; i < nb_events; i++) {
        STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
            if (udp_sock_p->sd == events[i].data.fd) {
                LOG_D(UDP_, "Found matching task desc\n");
                udp_eNB_receiver(udp_sock_p);
                break;
            }
        }
    }
}

gauthier's avatar
Task  
gauthier committed
148
static
149
int udp_eNB_create_socket(int port, char *ip_addr, task_id_t task_id)
gauthier's avatar
Task  
gauthier committed
150 151
{

152
    struct udp_socket_desc_s  *udp_socket_desc_p = NULL;
gauthier's avatar
Task  
gauthier committed
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    int                       sd, rc;
    struct sockaddr_in        sin;

    LOG_I(UDP_, "Initializing UDP for local address %s with port %d\n", ip_addr, port);

    sd = socket(AF_INET, SOCK_DGRAM, 0);
    AssertFatal(sd > 0, "UDP: Failed to create new socket: (%s:%d)\n", strerror(errno), errno);

    memset(&sin, 0, sizeof(struct sockaddr_in));
    sin.sin_family      = AF_INET;
    sin.sin_port        = htons(port);
    if (ip_addr == NULL) {
        sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
    } else {
        sin.sin_addr.s_addr = inet_addr(ip_addr);
    }

    if ((rc = bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in))) < 0) {
        close(sd);
        AssertFatal(rc >= 0, "UDP: Failed to bind socket: (%s:%d)\n\n", strerror(errno), errno);
    }

    /* Create a new descriptor for this connection */
176
    udp_socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s));
gauthier's avatar
Task  
gauthier committed
177

178
    DevAssert(udp_socket_desc_p != NULL);
gauthier's avatar
Task  
gauthier committed
179

180 181 182 183
    udp_socket_desc_p->sd            = sd;
    udp_socket_desc_p->local_address = ip_addr;
    udp_socket_desc_p->local_port    = port;
    udp_socket_desc_p->task_id       = task_id;
gauthier's avatar
Task  
gauthier committed
184

185 186 187 188 189 190
    LOG_I(UDP_, "Inserting new descriptor for task %d, sd %d\n", udp_socket_desc_p->task_id, udp_socket_desc_p->sd);
    pthread_mutex_lock(&udp_socket_list_mutex);
    STAILQ_INSERT_TAIL(&udp_socket_list, udp_socket_desc_p, entries);
    pthread_mutex_unlock(&udp_socket_list_mutex);

    itti_subscribe_event_fd(TASK_UDP, sd);
gauthier's avatar
Task  
gauthier committed
191 192 193 194
    LOG_I(UDP_, "Initializing UDP for local address %s with port %d: DONE\n", ip_addr, port);
    return sd;
}

195 196 197 198 199 200 201
int
udp_eNB_send_to(
    int sd,
    uint16_t port,
    uint32_t address,
    const uint8_t *buffer,
    uint32_t length)
gauthier's avatar
Task  
gauthier committed
202
{
203 204
    struct sockaddr_in to;
    socklen_t          to_length;
gauthier's avatar
Task  
gauthier committed
205

206 207 208 209
    if (sd <= 0 || ((buffer == NULL) && (length > 0))) {
        LOG_E(UDP_, "udp_send_to: bad param\n");
        return -1;
    }
gauthier's avatar
Task  
gauthier committed
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
    memset(&to, 0, sizeof(struct sockaddr_in));
    to_length = sizeof(to);

    to.sin_family      = AF_INET;
    to.sin_port        = htons(port);
    to.sin_addr.s_addr = address;

    if (sendto(sd, (void *)buffer, (size_t)length, 0, (struct sockaddr *)&to,
               to_length) < 0) {
        LOG_E(UDP_,
              "[SD %d] Failed to send data to "IPV4_ADDR" on port %d, buffer size %u\n",
              sd, IPV4_ADDR_FORMAT(address), port, length);
        return -1;
    }
    LOG_I(UDP_, "[SD %d] Successfully sent to "IPV4_ADDR
          " on port %d, buffer size %u, buffer address %x\n",
          sd, IPV4_ADDR_FORMAT(address), port, length, buffer);
    return 0;
}


void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP)
{
    uint8_t                   l_buffer[2048];
gauthier's avatar
Task  
gauthier committed
235 236 237
        int                n;
        socklen_t          from_len;
        struct sockaddr_in addr;
238 239 240
    MessageDef               *message_p        = NULL;
    udp_data_ind_t           *udp_data_ind_p   = NULL;
    uint8_t                  *forwarded_buffer = NULL;
gauthier's avatar
Task  
gauthier committed
241

242
    while (1) {
gauthier's avatar
Task  
gauthier committed
243 244
        from_len = (socklen_t)sizeof(struct sockaddr_in);

245 246
        LOG_I(UDP_, "before recvfrom sd %d\n", udp_sock_pP->sd);
        if ((n = recvfrom(udp_sock_pP->sd, l_buffer, sizeof(l_buffer), 0,
gauthier's avatar
Task  
gauthier committed
247 248 249 250 251
                          (struct sockaddr *)&addr, &from_len)) < 0) {
            LOG_E(UDP_, "Recvfrom failed %s\n", strerror(errno));
            break;
        } else {
            forwarded_buffer = calloc(n, sizeof(uint8_t));
252
            memcpy(forwarded_buffer, l_buffer, n);
gauthier's avatar
Task  
gauthier committed
253 254 255 256 257 258 259 260
            message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
            DevAssert(message_p != NULL);
            udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
            udp_data_ind_p->buffer        = forwarded_buffer;
            udp_data_ind_p->buffer_length = n;
            udp_data_ind_p->peer_port     = htons(addr.sin_port);
            udp_data_ind_p->peer_address  = addr.sin_addr.s_addr;

261
            LOG_I(UDP_, "Msg of length %d received from %s:%u\n",
gauthier's avatar
Task  
gauthier committed
262
                      n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
263 264 265 266
            if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) {
            	LOG_I(UDP_, "Failed to send message %d to task %d\n",
                          UDP_DATA_IND,
                          udp_sock_pP->task_id);
gauthier's avatar
Task  
gauthier committed
267 268 269 270
                break;
            }
        }
    }
271 272
    //close(udp_sock_p->sd);
    //udp_sock_p->sd = -1;
gauthier's avatar
Task  
gauthier committed
273

274 275 276
    //pthread_mutex_lock(&udp_socket_list_mutex);
    //STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries);
    //pthread_mutex_unlock(&udp_socket_list_mutex);
gauthier's avatar
Task  
gauthier committed
277 278 279 280 281
}


void *udp_eNB_task(void *args_p)
{
282 283 284 285 286 287 288
    int                 nb_events;
    struct epoll_event *events;
    MessageDef         *received_message_p    = NULL;
    const char         *msg_name = NULL;
    instance_t          instance  = 0;
    udp_enb_init(NULL);

gauthier's avatar
Task  
gauthier committed
289 290 291
    itti_mark_task_ready(TASK_UDP);
    while(1) {
        itti_receive_msg(TASK_UDP, &received_message_p);
292 293 294 295
        if (received_message_p != NULL) {

            msg_name = ITTI_MSG_NAME (received_message_p);
            instance = ITTI_MSG_INSTANCE (received_message_p);
gauthier's avatar
Task  
gauthier committed
296 297 298 299 300 301

        switch (ITTI_MSG_ID(received_message_p))
        {
            case UDP_INIT: {
                udp_init_t *udp_init_p;
                udp_init_p = &received_message_p->ittiMsg.udp_init;
302 303 304 305
                    udp_eNB_create_socket(
                    udp_init_p->port,
                    udp_init_p->address,
                    ITTI_MSG_ORIGIN_ID(received_message_p));
gauthier's avatar
Task  
gauthier committed
306
            } break;
307

gauthier's avatar
Task  
gauthier committed
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
            case UDP_DATA_REQ: {
                int     udp_sd = -1;
                ssize_t bytes_written;

                struct udp_socket_desc_s *udp_sock_p = NULL;
                udp_data_req_t           *udp_data_req_p;
                struct sockaddr_in        peer_addr;

                udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;

                memset(&peer_addr, 0, sizeof(struct sockaddr_in));

                peer_addr.sin_family       = AF_INET;
                peer_addr.sin_port         = htons(udp_data_req_p->peer_port);
                peer_addr.sin_addr.s_addr  = udp_data_req_p->peer_address;

                pthread_mutex_lock(&udp_socket_list_mutex);
325
                    udp_sock_p = udp_eNB_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));
gauthier's avatar
Task  
gauthier committed
326 327

                if (udp_sock_p == NULL) {
328 329 330 331
                        LOG_E(UDP_,
                                "Failed to retrieve the udp socket descriptor "
                                "associated with task %d\n",
                                ITTI_MSG_ORIGIN_ID(received_message_p));
gauthier's avatar
Task  
gauthier committed
332 333 334 335 336 337 338 339 340
                    pthread_mutex_unlock(&udp_socket_list_mutex);
                    if (udp_data_req_p->buffer) {
                        free(udp_data_req_p->buffer);
                    }
                    goto on_error;
                }
                udp_sd = udp_sock_p->sd;
                pthread_mutex_unlock(&udp_socket_list_mutex);

341 342 343
                LOG_I(UDP_, "[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
                            udp_sd,
                            udp_data_req_p->buffer_length,
gauthier's avatar
Task  
gauthier committed
344 345 346
                          IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
                          udp_data_req_p->peer_port);

347 348 349 350 351
                    bytes_written = sendto(
                            udp_sd,
                            udp_data_req_p->buffer,
                            udp_data_req_p->buffer_length,
                            0,
gauthier's avatar
Task  
gauthier committed
352 353 354 355 356 357 358 359
                                       (struct sockaddr *)&peer_addr,
                                       sizeof(struct sockaddr_in));

                if (bytes_written != udp_data_req_p->buffer_length) {
                    LOG_E(UDP_, "There was an error while writing to socket "
                    "(%d:%s)\n", errno, strerror(errno));
                }
            } break;
360

gauthier's avatar
Task  
gauthier committed
361 362 363
            case TERMINATE_MESSAGE: {
                itti_exit_task();
            } break;
364

gauthier's avatar
Task  
gauthier committed
365 366
            case MESSAGE_TEST: {
            } break;
367

gauthier's avatar
Task  
gauthier committed
368
            default: {
369 370 371
                LOG_I(UDP_, "Unkwnon message ID %d:%s\n",
                            ITTI_MSG_ID(received_message_p),
                            ITTI_MSG_NAME(received_message_p));
gauthier's avatar
Task  
gauthier committed
372 373 374
            } break;
        }
on_error:
375
            itti_free (ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p);
gauthier's avatar
Task  
gauthier committed
376
        received_message_p = NULL;
377 378 379 380 381 382
        }
        nb_events = itti_get_events(TASK_UDP, &events);
        /* Now handle notifications for other sockets */
        if (nb_events > 0) {
            udp_eNB_process_file_descriptors(events, nb_events);
        }
gauthier's avatar
Task  
gauthier committed
383 384 385 386 387 388
    }
    return NULL;
}

int udp_enb_init(const Enb_properties_t *enb_config_p)
{
389
    LOG_I(UDP_, "Initializing UDP task interface\n");
gauthier's avatar
Task  
gauthier committed
390
    STAILQ_INIT(&udp_socket_list);
391
    LOG_I(UDP_, "Initializing UDP task interface: DONE\n");
gauthier's avatar
Task  
gauthier committed
392 393
    return 0;
}