[GITLAB] - A technical upgrade is planned today at 6PM on our GITLAB server.

udp_eNB_task.c 13.1 KB
Newer Older
gauthier's avatar
gauthier committed
1
/*******************************************************************************
gauthier's avatar
gauthier committed
2 3
    OpenAirInterface
    Copyright(c) 1999 - 2014 Eurecom
4

gauthier's avatar
gauthier committed
5 6 7 8
    OpenAirInterface is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
9 10


gauthier's avatar
gauthier committed
11 12 13 14
    OpenAirInterface is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
15

gauthier's avatar
gauthier committed
16 17 18 19
    You should have received a copy of the GNU General Public License
    along with OpenAirInterface.The full GNU General Public License is
   included in this distribution in the file called "COPYING". If not,
   see <http://www.gnu.org/licenses/>.
20

gauthier's avatar
gauthier committed
21 22 23
  Contact Information
  OpenAirInterface Admin: openair_admin@eurecom.fr
  OpenAirInterface Tech : openair_tech@eurecom.fr
24
  OpenAirInterface Dev  : openair4g-devel@lists.eurecom.fr
gauthier's avatar
gauthier committed
25 26 27 28

  Address      : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France.

 *******************************************************************************/
29 30 31 32 33 34
/*! \file udp_eNB_task.c
* \brief
* \author Sebastien ROUX, Lionel Gauthier
* \company Eurecom
* \email: lionel.gauthier@eurecom.fr
*/
gauthier's avatar
gauthier committed
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <pthread.h>

#include "queue.h"
#include "intertask_interface.h"
#include "assertions.h"
#include "udp_eNB_task.h"

#include "UTIL/LOG/log.h"
gauthier's avatar
gauthier committed
53
#include "UTIL/LOG/vcd_signal_dumper.h"
54
#include "msc.h"
gauthier's avatar
gauthier committed
55

56

gauthier's avatar
gauthier committed
57 58 59 60 61 62 63 64 65
#define IPV4_ADDR    "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS)               \
    (uint8_t)((aDDRESS)  & 0x000000ff),         \
    (uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ),  \
    (uint8_t)(((aDDRESS) & 0x00ff0000) >> 16),  \
    (uint8_t)(((aDDRESS) & 0xff000000) >> 24)


struct udp_socket_desc_s {
66
  int       sd;              /* Socket descriptor to use */
gauthier's avatar
gauthier committed
67

68
  pthread_t listener_thread; /* Thread affected to recv */
gauthier's avatar
gauthier committed
69

70 71
  char     *local_address;   /* Local ipv4 address to use */
  uint16_t  local_port;      /* Local port to use */
gauthier's avatar
gauthier committed
72

73
  task_id_t task_id;         /* Task who has requested the new endpoint */
gauthier's avatar
gauthier committed
74

75
  STAILQ_ENTRY(udp_socket_desc_s) entries;
gauthier's avatar
gauthier committed
76 77 78 79 80 81
};

static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;


82 83 84 85 86
static
struct udp_socket_desc_s *
udp_eNB_get_socket_desc(task_id_t task_id);

void udp_eNB_process_file_descriptors(
87 88
  struct epoll_event *events,
  int nb_events);
89 90 91 92

static
int
udp_eNB_create_socket(
93 94 95
  int port,
  char *ip_addr,
  task_id_t task_id);
96 97 98

int
udp_eNB_send_to(
99 100 101 102 103
  int sd,
  uint16_t port,
  uint32_t address,
  const uint8_t *buffer,
  uint32_t length);
104 105 106 107 108 109

void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP);

void *udp_eNB_task(void *args_p);

int udp_enb_init(const Enb_properties_t *enb_config_p);
gauthier's avatar
gauthier committed
110 111 112
/* @brief Retrieve the descriptor associated with the task_id
 */
static
113
struct udp_socket_desc_s *udp_eNB_get_socket_desc(task_id_t task_id)
gauthier's avatar
gauthier committed
114
{
115
  struct udp_socket_desc_s *udp_sock_p = NULL;
gauthier's avatar
gauthier committed
116

117
#if defined(LOG_UDP) && LOG_UDP > 0
118
  LOG_T(UDP_, "Looking for task %d\n", task_id);
119
#endif
gauthier's avatar
gauthier committed
120

121 122
  STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
    if (udp_sock_p->task_id == task_id) {
123
#if defined(LOG_UDP) && LOG_UDP > 0
124
      LOG_T(UDP_, "Found matching task desc\n");
125
#endif
126
      break;
gauthier's avatar
gauthier committed
127
    }
128 129
  }
  return udp_sock_p;
gauthier's avatar
gauthier committed
130 131
}

132 133
void udp_eNB_process_file_descriptors(struct epoll_event *events, int nb_events)
{
134 135
  int                       i;
  struct udp_socket_desc_s *udp_sock_p = NULL;
136

137 138 139
  if (events == NULL) {
    return;
  }
140

141 142 143
  for (i = 0; i < nb_events; i++) {
    STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
      if (udp_sock_p->sd == events[i].data.fd) {
144
#if defined(LOG_UDP) && LOG_UDP > 0
145
        LOG_D(UDP_, "Found matching task desc\n");
146
#endif
147 148 149
        udp_eNB_receiver(udp_sock_p);
        break;
      }
150
    }
151
  }
152 153
}

gauthier's avatar
gauthier committed
154
static
155
int udp_eNB_create_socket(int port, char *ip_addr, task_id_t task_id)
gauthier's avatar
gauthier committed
156 157
{

158 159 160
  struct udp_socket_desc_s  *udp_socket_desc_p = NULL;
  int                       sd, rc;
  struct sockaddr_in        sin;
gauthier's avatar
gauthier committed
161

162
  LOG_I(UDP_, "Initializing UDP for local address %s with port %d\n", ip_addr, port);
gauthier's avatar
gauthier committed
163

164 165
  sd = socket(AF_INET, SOCK_DGRAM, 0);
  AssertFatal(sd > 0, "UDP: Failed to create new socket: (%s:%d)\n", strerror(errno), errno);
gauthier's avatar
gauthier committed
166

167 168 169
  memset(&sin, 0, sizeof(struct sockaddr_in));
  sin.sin_family      = AF_INET;
  sin.sin_port        = htons(port);
gauthier's avatar
gauthier committed
170

171
  if (ip_addr == NULL) {
172
    sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
173 174 175
  } else {
    sin.sin_addr.s_addr = inet_addr(ip_addr);
  }
gauthier's avatar
gauthier committed
176

177 178
  if ((rc = bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in))) < 0) {
    close(sd);
179 180
    AssertFatal(rc >= 0, "UDP: Failed to bind socket: (%s:%d) address %s port %u\n",
                strerror(errno), errno, ip_addr, port);
181
  }
gauthier's avatar
gauthier committed
182

183 184
  /* Create a new descriptor for this connection */
  udp_socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s));
gauthier's avatar
gauthier committed
185

186
  DevAssert(udp_socket_desc_p != NULL);
gauthier's avatar
gauthier committed
187

188 189 190 191
  udp_socket_desc_p->sd            = sd;
  udp_socket_desc_p->local_address = ip_addr;
  udp_socket_desc_p->local_port    = port;
  udp_socket_desc_p->task_id       = task_id;
192

193 194 195 196 197 198 199 200
  LOG_I(UDP_, "Inserting new descriptor for task %d, sd %d\n", udp_socket_desc_p->task_id, udp_socket_desc_p->sd);
  pthread_mutex_lock(&udp_socket_list_mutex);
  STAILQ_INSERT_TAIL(&udp_socket_list, udp_socket_desc_p, entries);
  pthread_mutex_unlock(&udp_socket_list_mutex);

  itti_subscribe_event_fd(TASK_UDP, sd);
  LOG_I(UDP_, "Initializing UDP for local address %s with port %d: DONE\n", ip_addr, port);
  return sd;
gauthier's avatar
gauthier committed
201 202
}

203 204
int
udp_eNB_send_to(
205 206 207 208 209
  int sd,
  uint16_t port,
  uint32_t address,
  const uint8_t *buffer,
  uint32_t length)
gauthier's avatar
gauthier committed
210
{
211 212
  struct sockaddr_in to;
  socklen_t          to_length;
gauthier's avatar
gauthier committed
213

214 215 216 217
  if (sd <= 0 || ((buffer == NULL) && (length > 0))) {
    LOG_E(UDP_, "udp_send_to: bad param\n");
    return -1;
  }
gauthier's avatar
gauthier committed
218

219 220
  memset(&to, 0, sizeof(struct sockaddr_in));
  to_length = sizeof(to);
221

222 223 224 225 226 227 228 229 230 231 232
  to.sin_family      = AF_INET;
  to.sin_port        = htons(port);
  to.sin_addr.s_addr = address;

  if (sendto(sd, (void *)buffer, (size_t)length, 0, (struct sockaddr *)&to,
             to_length) < 0) {
    LOG_E(UDP_,
          "[SD %d] Failed to send data to "IPV4_ADDR" on port %d, buffer size %u\n",
          sd, IPV4_ADDR_FORMAT(address), port, length);
    return -1;
  }
233

234
#if defined(LOG_UDP) && LOG_UDP > 0
235 236 237
  LOG_I(UDP_, "[SD %d] Successfully sent to "IPV4_ADDR
        " on port %d, buffer size %u, buffer address %x\n",
        sd, IPV4_ADDR_FORMAT(address), port, length, buffer);
238
#endif
239
  return 0;
240 241 242 243 244
}


void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP)
{
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
  uint8_t                   l_buffer[2048];
  int                n;
  socklen_t          from_len;
  struct sockaddr_in addr;
  MessageDef               *message_p        = NULL;
  udp_data_ind_t           *udp_data_ind_p   = NULL;
  uint8_t                  *forwarded_buffer = NULL;

  if (1) {
    from_len = (socklen_t)sizeof(struct sockaddr_in);

    if ((n = recvfrom(udp_sock_pP->sd, l_buffer, sizeof(l_buffer), 0,
                      (struct sockaddr *)&addr, &from_len)) < 0) {
      LOG_E(UDP_, "Recvfrom failed %s\n", strerror(errno));
      return;
    } else if (n == 0) {
      LOG_W(UDP_, "Recvfrom returned 0\n");
      return;
    } else {
      forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, n*sizeof(uint8_t));
      DevAssert(forwarded_buffer != NULL);
      memcpy(forwarded_buffer, l_buffer, n);
      message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
      DevAssert(message_p != NULL);
      udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
      udp_data_ind_p->buffer        = forwarded_buffer;
      udp_data_ind_p->buffer_length = n;
      udp_data_ind_p->peer_port     = htons(addr.sin_port);
      udp_data_ind_p->peer_address  = addr.sin_addr.s_addr;
gauthier's avatar
gauthier committed
274

275
#if defined(LOG_UDP) && LOG_UDP > 0
276 277
      LOG_I(UDP_, "Msg of length %d received from %s:%u\n",
            n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
278
#endif
279 280 281 282 283 284 285

      if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) {
        LOG_I(UDP_, "Failed to send message %d to task %d\n",
              UDP_DATA_IND,
              udp_sock_pP->task_id);
        return;
      }
gauthier's avatar
gauthier committed
286
    }
287 288 289 290
  }

  //close(udp_sock_p->sd);
  //udp_sock_p->sd = -1;
gauthier's avatar
gauthier committed
291

292 293 294
  //pthread_mutex_lock(&udp_socket_list_mutex);
  //STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries);
  //pthread_mutex_unlock(&udp_socket_list_mutex);
gauthier's avatar
gauthier committed
295 296 297 298 299
}


void *udp_eNB_task(void *args_p)
{
300 301 302
  int                 nb_events;
  struct epoll_event *events;
  MessageDef         *received_message_p    = NULL;
303 304
  //const char         *msg_name = NULL;
  //instance_t          instance  = 0;
305 306 307
  udp_enb_init(NULL);

  itti_mark_task_ready(TASK_UDP);
308
  MSC_START_USE();
309 310 311

  while(1) {
    itti_receive_msg(TASK_UDP, &received_message_p);
gauthier's avatar
gauthier committed
312
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_UDP_ENB_TASK, VCD_FUNCTION_IN);
313
#if defined(LOG_UDP) && LOG_UDP > 0
314
    LOG_D(UDP_, "Got message %p\n", &received_message_p);
315
#endif
316 317 318

    if (received_message_p != NULL) {

319 320
      //msg_name = ITTI_MSG_NAME (received_message_p);
      //instance = ITTI_MSG_INSTANCE (received_message_p);
321 322 323 324 325 326 327 328 329 330 331 332 333 334

      switch (ITTI_MSG_ID(received_message_p)) {
      case UDP_INIT: {
        LOG_D(UDP_, "Received UDP_INIT\n");
        udp_init_t *udp_init_p;
        udp_init_p = &received_message_p->ittiMsg.udp_init;
        udp_eNB_create_socket(
          udp_init_p->port,
          udp_init_p->address,
          ITTI_MSG_ORIGIN_ID(received_message_p));
      }
      break;

      case UDP_DATA_REQ: {
335
#if defined(LOG_UDP) && LOG_UDP > 0
336
        LOG_D(UDP_, "Received UDP_DATA_REQ\n");
337
#endif
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
        int     udp_sd = -1;
        ssize_t bytes_written;

        struct udp_socket_desc_s *udp_sock_p = NULL;
        udp_data_req_t           *udp_data_req_p;
        struct sockaddr_in        peer_addr;

        udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;

        memset(&peer_addr, 0, sizeof(struct sockaddr_in));

        peer_addr.sin_family       = AF_INET;
        peer_addr.sin_port         = htons(udp_data_req_p->peer_port);
        peer_addr.sin_addr.s_addr  = udp_data_req_p->peer_address;

        pthread_mutex_lock(&udp_socket_list_mutex);
        udp_sock_p = udp_eNB_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));

        if (udp_sock_p == NULL) {
          LOG_E(UDP_,
                "Failed to retrieve the udp socket descriptor "
                "associated with task %d\n",
                ITTI_MSG_ORIGIN_ID(received_message_p));
          pthread_mutex_unlock(&udp_socket_list_mutex);

          if (udp_data_req_p->buffer) {
            itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_req_p->buffer);
          }

          goto on_error;
        }

        udp_sd = udp_sock_p->sd;
        pthread_mutex_unlock(&udp_socket_list_mutex);
gauthier's avatar
gauthier committed
372

373
#if defined(LOG_UDP) && LOG_UDP > 0
374 375 376 377 378
        LOG_D(UDP_, "[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
              udp_sd,
              udp_data_req_p->buffer_length,
              IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
              udp_data_req_p->peer_port);
379
#endif
380 381 382 383 384 385 386 387 388 389 390 391
        bytes_written = sendto(
                          udp_sd,
                          &udp_data_req_p->buffer[udp_data_req_p->buffer_offset],
                          udp_data_req_p->buffer_length,
                          0,
                          (struct sockaddr *)&peer_addr,
                          sizeof(struct sockaddr_in));

        if (bytes_written != udp_data_req_p->buffer_length) {
          LOG_E(UDP_, "There was an error while writing to socket %u rc %d"
                "(%d:%s) May be normal if GTPU kernel module loaded on same host (may NF_DROP IP packet)\n",
                udp_sd, bytes_written, errno, strerror(errno));
392
        }
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423

        itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_req_p->buffer);
      }
      break;

      case TERMINATE_MESSAGE: {
        LOG_W(UDP_, "Received TERMINATE_MESSAGE\n");
        itti_exit_task();
      }
      break;

      case MESSAGE_TEST: {
      } break;

      default: {
        LOG_W(UDP_, "Unkwnon message ID %d:%s\n",
              ITTI_MSG_ID(received_message_p),
              ITTI_MSG_NAME(received_message_p));
      }
      break;
      }

on_error:
      itti_free (ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p);
      received_message_p = NULL;
    }

    nb_events = itti_get_events(TASK_UDP, &events);

    /* Now handle notifications for other sockets */
    if (nb_events > 0) {
424
#if defined(LOG_UDP) && LOG_UDP > 0
425
      LOG_D(UDP_, "UDP task Process %d events\n",nb_events);
426
#endif
427
      udp_eNB_process_file_descriptors(events, nb_events);
gauthier's avatar
gauthier committed
428
    }
429

gauthier's avatar
gauthier committed
430
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_UDP_ENB_TASK, VCD_FUNCTION_OUT);
431 432 433 434
  }

  LOG_N(UDP_, "Task UDP eNB exiting\n");
  return NULL;
gauthier's avatar
gauthier committed
435 436 437 438
}

int udp_enb_init(const Enb_properties_t *enb_config_p)
{
439 440 441 442
  LOG_I(UDP_, "Initializing UDP task interface\n");
  STAILQ_INIT(&udp_socket_list);
  LOG_I(UDP_, "Initializing UDP task interface: DONE\n");
  return 0;
gauthier's avatar
gauthier committed
443
}