udp_eNB_task.c 12.8 KB
Newer Older
1 2 3 4 5
/*
 * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The OpenAirInterface Software Alliance licenses this file to You under
6
 * the OAI Public License, Version 1.1  (the "License"); you may not use this file
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.openairinterface.org/?page_id=698
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *-------------------------------------------------------------------------------
 * For more information about the OpenAirInterface (OAI) Software Alliance:
 *      contact@openairinterface.org
 */

22 23 24 25 26 27
/*! \file udp_eNB_task.c
* \brief
* \author Sebastien ROUX, Lionel Gauthier
* \company Eurecom
* \email: lionel.gauthier@eurecom.fr
*/
gauthier's avatar
gauthier committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <pthread.h>

#include "queue.h"
#include "intertask_interface.h"
#include "assertions.h"
#include "udp_eNB_task.h"

#include "UTIL/LOG/log.h"
gauthier's avatar
gauthier committed
46
#include "UTIL/LOG/vcd_signal_dumper.h"
47
#include "msc.h"
gauthier's avatar
gauthier committed
48

49

gauthier's avatar
gauthier committed
50 51 52 53 54 55 56 57 58
#define IPV4_ADDR    "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS)               \
    (uint8_t)((aDDRESS)  & 0x000000ff),         \
    (uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ),  \
    (uint8_t)(((aDDRESS) & 0x00ff0000) >> 16),  \
    (uint8_t)(((aDDRESS) & 0xff000000) >> 24)


struct udp_socket_desc_s {
59
  int       sd;              /* Socket descriptor to use */
gauthier's avatar
gauthier committed
60

61
  pthread_t listener_thread; /* Thread affected to recv */
gauthier's avatar
gauthier committed
62

63 64
  char     *local_address;   /* Local ipv4 address to use */
  uint16_t  local_port;      /* Local port to use */
gauthier's avatar
gauthier committed
65

66
  task_id_t task_id;         /* Task who has requested the new endpoint */
gauthier's avatar
gauthier committed
67

68
  STAILQ_ENTRY(udp_socket_desc_s) entries;
gauthier's avatar
gauthier committed
69 70 71 72 73 74
};

static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;


75 76 77 78 79
static
struct udp_socket_desc_s *
udp_eNB_get_socket_desc(task_id_t task_id);

void udp_eNB_process_file_descriptors(
80 81
  struct epoll_event *events,
  int nb_events);
82 83 84 85

static
int
udp_eNB_create_socket(
86 87 88
  int port,
  char *ip_addr,
  task_id_t task_id);
89 90 91

int
udp_eNB_send_to(
92 93 94 95 96
  int sd,
  uint16_t port,
  uint32_t address,
  const uint8_t *buffer,
  uint32_t length);
97 98 99 100 101

void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP);

void *udp_eNB_task(void *args_p);

102
int udp_enb_init(void);
gauthier's avatar
gauthier committed
103 104 105
/* @brief Retrieve the descriptor associated with the task_id
 */
static
106
struct udp_socket_desc_s *udp_eNB_get_socket_desc(task_id_t task_id)
gauthier's avatar
gauthier committed
107
{
108
  struct udp_socket_desc_s *udp_sock_p = NULL;
gauthier's avatar
gauthier committed
109

110
#if defined(LOG_UDP) && LOG_UDP > 0
111
  LOG_T(UDP_, "Looking for task %d\n", task_id);
112
#endif
gauthier's avatar
gauthier committed
113

114 115
  STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
    if (udp_sock_p->task_id == task_id) {
116
#if defined(LOG_UDP) && LOG_UDP > 0
117
      LOG_T(UDP_, "Found matching task desc\n");
118
#endif
119
      break;
gauthier's avatar
gauthier committed
120
    }
121 122
  }
  return udp_sock_p;
gauthier's avatar
gauthier committed
123 124
}

125 126
void udp_eNB_process_file_descriptors(struct epoll_event *events, int nb_events)
{
127 128
  int                       i;
  struct udp_socket_desc_s *udp_sock_p = NULL;
129

130 131 132
  if (events == NULL) {
    return;
  }
133

134 135 136
  for (i = 0; i < nb_events; i++) {
    STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
      if (udp_sock_p->sd == events[i].data.fd) {
137
#if defined(LOG_UDP) && LOG_UDP > 0
138
        LOG_D(UDP_, "Found matching task desc\n");
139
#endif
140 141 142
        udp_eNB_receiver(udp_sock_p);
        break;
      }
143
    }
144
  }
145 146
}

gauthier's avatar
gauthier committed
147
static
148
int udp_eNB_create_socket(int port, char *ip_addr, task_id_t task_id)
gauthier's avatar
gauthier committed
149 150
{

151 152 153
  struct udp_socket_desc_s  *udp_socket_desc_p = NULL;
  int                       sd, rc;
  struct sockaddr_in        sin;
gauthier's avatar
gauthier committed
154

155
  LOG_I(UDP_, "Initializing UDP for local address %s with port %d\n", ip_addr, port);
gauthier's avatar
gauthier committed
156

157 158
  sd = socket(AF_INET, SOCK_DGRAM, 0);
  AssertFatal(sd > 0, "UDP: Failed to create new socket: (%s:%d)\n", strerror(errno), errno);
gauthier's avatar
gauthier committed
159

160 161 162
  memset(&sin, 0, sizeof(struct sockaddr_in));
  sin.sin_family      = AF_INET;
  sin.sin_port        = htons(port);
gauthier's avatar
gauthier committed
163

164
  if (ip_addr == NULL) {
165
    sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
166 167 168
  } else {
    sin.sin_addr.s_addr = inet_addr(ip_addr);
  }
gauthier's avatar
gauthier committed
169

170 171
  if ((rc = bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in))) < 0) {
    close(sd);
172 173
    AssertFatal(rc >= 0, "UDP: Failed to bind socket: (%s:%d) address %s port %u\n",
                strerror(errno), errno, ip_addr, port);
174
  }
gauthier's avatar
gauthier committed
175

176 177
  /* Create a new descriptor for this connection */
  udp_socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s));
gauthier's avatar
gauthier committed
178

179
  DevAssert(udp_socket_desc_p != NULL);
gauthier's avatar
gauthier committed
180

181 182 183 184
  udp_socket_desc_p->sd            = sd;
  udp_socket_desc_p->local_address = ip_addr;
  udp_socket_desc_p->local_port    = port;
  udp_socket_desc_p->task_id       = task_id;
185

186 187 188 189 190 191 192 193
  LOG_I(UDP_, "Inserting new descriptor for task %d, sd %d\n", udp_socket_desc_p->task_id, udp_socket_desc_p->sd);
  pthread_mutex_lock(&udp_socket_list_mutex);
  STAILQ_INSERT_TAIL(&udp_socket_list, udp_socket_desc_p, entries);
  pthread_mutex_unlock(&udp_socket_list_mutex);

  itti_subscribe_event_fd(TASK_UDP, sd);
  LOG_I(UDP_, "Initializing UDP for local address %s with port %d: DONE\n", ip_addr, port);
  return sd;
gauthier's avatar
gauthier committed
194 195
}

196 197
int
udp_eNB_send_to(
198 199 200 201 202
  int sd,
  uint16_t port,
  uint32_t address,
  const uint8_t *buffer,
  uint32_t length)
gauthier's avatar
gauthier committed
203
{
204 205
  struct sockaddr_in to;
  socklen_t          to_length;
gauthier's avatar
gauthier committed
206

207 208 209 210
  if (sd <= 0 || ((buffer == NULL) && (length > 0))) {
    LOG_E(UDP_, "udp_send_to: bad param\n");
    return -1;
  }
gauthier's avatar
gauthier committed
211

212 213
  memset(&to, 0, sizeof(struct sockaddr_in));
  to_length = sizeof(to);
214

215 216 217 218 219 220 221 222 223 224 225
  to.sin_family      = AF_INET;
  to.sin_port        = htons(port);
  to.sin_addr.s_addr = address;

  if (sendto(sd, (void *)buffer, (size_t)length, 0, (struct sockaddr *)&to,
             to_length) < 0) {
    LOG_E(UDP_,
          "[SD %d] Failed to send data to "IPV4_ADDR" on port %d, buffer size %u\n",
          sd, IPV4_ADDR_FORMAT(address), port, length);
    return -1;
  }
226

227
#if defined(LOG_UDP) && LOG_UDP > 0
228 229 230
  LOG_I(UDP_, "[SD %d] Successfully sent to "IPV4_ADDR
        " on port %d, buffer size %u, buffer address %x\n",
        sd, IPV4_ADDR_FORMAT(address), port, length, buffer);
231
#endif
232
  return 0;
233 234 235 236 237
}


void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP)
{
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
  uint8_t                   l_buffer[2048];
  int                n;
  socklen_t          from_len;
  struct sockaddr_in addr;
  MessageDef               *message_p        = NULL;
  udp_data_ind_t           *udp_data_ind_p   = NULL;
  uint8_t                  *forwarded_buffer = NULL;

  if (1) {
    from_len = (socklen_t)sizeof(struct sockaddr_in);

    if ((n = recvfrom(udp_sock_pP->sd, l_buffer, sizeof(l_buffer), 0,
                      (struct sockaddr *)&addr, &from_len)) < 0) {
      LOG_E(UDP_, "Recvfrom failed %s\n", strerror(errno));
      return;
    } else if (n == 0) {
      LOG_W(UDP_, "Recvfrom returned 0\n");
      return;
    } else {
      forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, n*sizeof(uint8_t));
      DevAssert(forwarded_buffer != NULL);
      memcpy(forwarded_buffer, l_buffer, n);
      message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
      DevAssert(message_p != NULL);
      udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
      udp_data_ind_p->buffer        = forwarded_buffer;
      udp_data_ind_p->buffer_length = n;
      udp_data_ind_p->peer_port     = htons(addr.sin_port);
      udp_data_ind_p->peer_address  = addr.sin_addr.s_addr;
gauthier's avatar
gauthier committed
267

268
#if defined(LOG_UDP) && LOG_UDP > 0
269 270
      LOG_I(UDP_, "Msg of length %d received from %s:%u\n",
            n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
271
#endif
272 273 274 275 276 277 278

      if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) {
        LOG_I(UDP_, "Failed to send message %d to task %d\n",
              UDP_DATA_IND,
              udp_sock_pP->task_id);
        return;
      }
gauthier's avatar
gauthier committed
279
    }
280 281 282 283
  }

  //close(udp_sock_p->sd);
  //udp_sock_p->sd = -1;
gauthier's avatar
gauthier committed
284

285 286 287
  //pthread_mutex_lock(&udp_socket_list_mutex);
  //STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries);
  //pthread_mutex_unlock(&udp_socket_list_mutex);
gauthier's avatar
gauthier committed
288 289 290 291 292
}


void *udp_eNB_task(void *args_p)
{
293 294 295
  int                 nb_events;
  struct epoll_event *events;
  MessageDef         *received_message_p    = NULL;
296 297
  //const char         *msg_name = NULL;
  //instance_t          instance  = 0;
298
  udp_enb_init();
299 300

  itti_mark_task_ready(TASK_UDP);
301
  MSC_START_USE();
302 303 304

  while(1) {
    itti_receive_msg(TASK_UDP, &received_message_p);
gauthier's avatar
gauthier committed
305
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_UDP_ENB_TASK, VCD_FUNCTION_IN);
306
#if defined(LOG_UDP) && LOG_UDP > 0
307
    LOG_D(UDP_, "Got message %p\n", &received_message_p);
308
#endif
309 310 311

    if (received_message_p != NULL) {

312 313
      //msg_name = ITTI_MSG_NAME (received_message_p);
      //instance = ITTI_MSG_INSTANCE (received_message_p);
314 315 316 317 318 319 320 321 322 323 324 325 326 327

      switch (ITTI_MSG_ID(received_message_p)) {
      case UDP_INIT: {
        LOG_D(UDP_, "Received UDP_INIT\n");
        udp_init_t *udp_init_p;
        udp_init_p = &received_message_p->ittiMsg.udp_init;
        udp_eNB_create_socket(
          udp_init_p->port,
          udp_init_p->address,
          ITTI_MSG_ORIGIN_ID(received_message_p));
      }
      break;

      case UDP_DATA_REQ: {
328
#if defined(LOG_UDP) && LOG_UDP > 0
329
        LOG_D(UDP_, "Received UDP_DATA_REQ\n");
330
#endif
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
        int     udp_sd = -1;
        ssize_t bytes_written;

        struct udp_socket_desc_s *udp_sock_p = NULL;
        udp_data_req_t           *udp_data_req_p;
        struct sockaddr_in        peer_addr;

        udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;

        memset(&peer_addr, 0, sizeof(struct sockaddr_in));

        peer_addr.sin_family       = AF_INET;
        peer_addr.sin_port         = htons(udp_data_req_p->peer_port);
        peer_addr.sin_addr.s_addr  = udp_data_req_p->peer_address;

        pthread_mutex_lock(&udp_socket_list_mutex);
        udp_sock_p = udp_eNB_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));

        if (udp_sock_p == NULL) {
          LOG_E(UDP_,
                "Failed to retrieve the udp socket descriptor "
                "associated with task %d\n",
                ITTI_MSG_ORIGIN_ID(received_message_p));
          pthread_mutex_unlock(&udp_socket_list_mutex);

          if (udp_data_req_p->buffer) {
            itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_req_p->buffer);
          }

          goto on_error;
        }

        udp_sd = udp_sock_p->sd;
        pthread_mutex_unlock(&udp_socket_list_mutex);
gauthier's avatar
gauthier committed
365

366
#if defined(LOG_UDP) && LOG_UDP > 0
367 368 369 370 371
        LOG_D(UDP_, "[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
              udp_sd,
              udp_data_req_p->buffer_length,
              IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
              udp_data_req_p->peer_port);
372
#endif
373 374 375 376 377 378 379 380 381
        bytes_written = sendto(
                          udp_sd,
                          &udp_data_req_p->buffer[udp_data_req_p->buffer_offset],
                          udp_data_req_p->buffer_length,
                          0,
                          (struct sockaddr *)&peer_addr,
                          sizeof(struct sockaddr_in));

        if (bytes_written != udp_data_req_p->buffer_length) {
Cedric Roux's avatar
Cedric Roux committed
382
          LOG_E(UDP_, "There was an error while writing to socket %d rc %zd"
383 384
                "(%d:%s) May be normal if GTPU kernel module loaded on same host (may NF_DROP IP packet)\n",
                udp_sd, bytes_written, errno, strerror(errno));
385
        }
386 387 388 389 390 391

        itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_req_p->buffer);
      }
      break;

      case TERMINATE_MESSAGE: {
392
        LOG_W(UDP_, " *** Exiting UDP thread\n");
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
        itti_exit_task();
      }
      break;

      case MESSAGE_TEST: {
      } break;

      default: {
        LOG_W(UDP_, "Unkwnon message ID %d:%s\n",
              ITTI_MSG_ID(received_message_p),
              ITTI_MSG_NAME(received_message_p));
      }
      break;
      }

on_error:
      itti_free (ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p);
      received_message_p = NULL;
    }

    nb_events = itti_get_events(TASK_UDP, &events);

    /* Now handle notifications for other sockets */
    if (nb_events > 0) {
417
#if defined(LOG_UDP) && LOG_UDP > 0
418
      LOG_D(UDP_, "UDP task Process %d events\n",nb_events);
419
#endif
420
      udp_eNB_process_file_descriptors(events, nb_events);
gauthier's avatar
gauthier committed
421
    }
422

gauthier's avatar
gauthier committed
423
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_UDP_ENB_TASK, VCD_FUNCTION_OUT);
424 425 426 427
  }

  LOG_N(UDP_, "Task UDP eNB exiting\n");
  return NULL;
gauthier's avatar
gauthier committed
428 429
}

430
int udp_enb_init(void)
gauthier's avatar
gauthier committed
431
{
432 433 434 435
  LOG_I(UDP_, "Initializing UDP task interface\n");
  STAILQ_INIT(&udp_socket_list);
  LOG_I(UDP_, "Initializing UDP task interface: DONE\n");
  return 0;
gauthier's avatar
gauthier committed
436
}