intertask_interface_dump.c 30.6 KB
Newer Older
1
/*******************************************************************************
ghaddab's avatar
ghaddab committed
2 3
    OpenAirInterface 
    Copyright(c) 1999 - 2014 Eurecom
4

ghaddab's avatar
ghaddab committed
5 6 7 8
    OpenAirInterface is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
9 10


ghaddab's avatar
ghaddab committed
11 12 13 14
    OpenAirInterface is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
15

ghaddab's avatar
ghaddab committed
16 17 18 19
    You should have received a copy of the GNU General Public License
    along with OpenAirInterface.The full GNU General Public License is 
   included in this distribution in the file called "COPYING". If not, 
   see <http://www.gnu.org/licenses/>.
20 21

  Contact Information
ghaddab's avatar
ghaddab committed
22 23 24 25
  OpenAirInterface Admin: openair_admin@eurecom.fr
  OpenAirInterface Tech : openair_tech@eurecom.fr
  OpenAirInterface Dev  : openair4g-devel@eurecom.fr
  
ghaddab's avatar
ghaddab committed
26
  Address      : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
ghaddab's avatar
ghaddab committed
27 28

 *******************************************************************************/
29 30 31


/** @brief Intertask Interface Signal Dumper
32
 * Allows users to connect their itti_analyzer to this process and dump
33 34 35 36 37 38 39 40 41 42 43 44
 * signals exchanged between tasks.
 * @author Sebastien Roux <sebastien.roux@eurecom.fr>
 */

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <error.h>
45
#include <sched.h>
46 47 48 49 50 51 52

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <arpa/inet.h>

53 54
#include <sys/eventfd.h>

55
#include "assertions.h"
56
#include "liblfds611.h"
57

winckel's avatar
winckel committed
58
#include "itti_types.h"
59 60 61
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

62
#if defined(OAI_EMU) || defined(RTAI)
63 64 65
#include "vcd_signal_dumper.h"
#endif

66
static const int itti_dump_debug = 0; // 0x8 | 0x4 | 0x2;
67

68
#ifdef RTAI
69
# define ITTI_DUMP_DEBUG(m, x, args...) do { if ((m) & itti_dump_debug) rt_printk("[ITTI_DUMP][D]"x, ##args); } \
70 71
    while(0)
#else
72
# define ITTI_DUMP_DEBUG(m, x, args...) do { if ((m) & itti_dump_debug) fprintf(stdout, "[ITTI_DUMP][D]"x, ##args); } \
73
    while(0)
74
#endif
75 76
#define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI_DUMP][E]"x, ##args); } \
    while(0)
77

78

79
typedef struct itti_dump_queue_item_s {
80 81 82 83 84
    MessageDef *data;
    uint32_t    data_size;
    uint32_t    message_number;
    uint32_t    message_type;
    uint32_t    message_size;
85
} itti_dump_queue_item_t;
86 87 88 89 90 91 92

typedef struct {
    int      sd;
    uint32_t last_message_number;
} itti_client_desc_t;

typedef struct itti_desc_s {
93 94 95
    /* Asynchronous thread that write to file/accept new clients */
    pthread_t      itti_acceptor_thread;
    pthread_attr_t attr;
96 97 98

    /* List of messages to dump.
     * NOTE: we limit the size of this queue to retain only the last exchanged
99
     * messages. The size can be increased by setting up the ITTI_QUEUE_MAX_ELEMENTS
100 101
     * in mme_default_values.h or by putting a custom in the configuration file.
     */
102 103
    struct lfds611_ringbuffer_state *itti_message_queue;

104 105
    int nb_connected;

106
#ifndef RTAI
107 108
    /* Event fd used to notify new messages (semaphore) */
    int event_fd;
109 110 111
#else
    unsigned long messages_in_queue __attribute__((aligned(8)));
#endif
112 113 114

    int itti_listen_socket;

115 116 117 118
    itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON];
} itti_desc_t;

typedef struct {
winckel's avatar
winckel committed
119
    itti_socket_header_t socket_header;
120

winckel's avatar
winckel committed
121
    itti_signal_header_t signal_header;
122 123 124 125 126

    /* Message payload is added here, this struct is used as an header */
} itti_dump_message_t;

typedef struct {
winckel's avatar
winckel committed
127
    itti_socket_header_t socket_header;
128 129
} itti_statistic_message_t;

winckel's avatar
winckel committed
130 131 132
static const itti_message_types_t itti_dump_xml_definition_end =  ITTI_DUMP_XML_DEFINITION_END;
static const itti_message_types_t itti_dump_message_type_end =    ITTI_DUMP_MESSAGE_TYPE_END;

133
static itti_desc_t itti_dump_queue;
134
static FILE *dump_file = NULL;
135
static int itti_dump_running = 1;
136

137
static volatile uint32_t pending_messages = 0;
138

139 140
/*------------------------------------------------------------------------------*/
static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
141 142
{
    itti_dump_message_t *new_message;
143 144
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
145 146

    /* Allocate memory for message header and payload */
winckel's avatar
winckel committed
147
    size_t size = sizeof(itti_dump_message_t) + message->data_size + sizeof(itti_message_types_t);
148

149 150
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid!\n", sd);
    AssertFatal (message != NULL, "Message is NULL!\n");
151

winckel's avatar
winckel committed
152
    new_message = malloc(size);
153
    AssertFatal (new_message != NULL, "New message allocation failed!\n");
154 155

    /* Preparing the header */
winckel's avatar
winckel committed
156 157
    new_message->socket_header.message_size = size;
    new_message->socket_header.message_type = ITTI_DUMP_MESSAGE_TYPE;
158 159 160 161
    /* Adds message number in unsigned decimal ASCII format */
    snprintf(new_message->signal_header.message_number_char, sizeof(new_message->signal_header.message_number_char),
             MESSAGE_NUMBER_CHAR_FORMAT, message->message_number);
    new_message->signal_header.message_number_char[sizeof(new_message->signal_header.message_number_char) - 1] = '\n';
162 163 164
    /* Appends message payload */
    memcpy(&new_message[1], message->data, message->data_size);

winckel's avatar
winckel committed
165 166
    memcpy(((void *) &new_message[1]) + message->data_size, &itti_dump_message_type_end, sizeof(itti_message_types_t));

167 168 169 170 171
    data_ptr = (uint8_t *)&new_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0);
        if (bytes_sent < 0) {
172
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
173 174 175 176 177 178
                       sd, size, errno, strerror(errno));
            free(new_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != size);
179 180

    free(new_message);
181
    return total_sent;
182 183
}

184
static int itti_dump_fwrite_message(itti_dump_queue_item_t *message)
185
{
186
    itti_dump_message_t  new_message_header;
187

188
    if ((dump_file != NULL) && (message != NULL)) {
189

190 191 192 193 194
        new_message_header.socket_header.message_size = message->message_size + sizeof(itti_dump_message_t) + sizeof(itti_message_types_t);
        new_message_header.socket_header.message_type = message->message_type;
        snprintf(new_message_header.signal_header.message_number_char, sizeof(new_message_header.signal_header.message_number_char),
                 MESSAGE_NUMBER_CHAR_FORMAT, message->message_number);
        new_message_header.signal_header.message_number_char[sizeof(new_message_header.signal_header.message_number_char) - 1] = '\n';
195

196
        fwrite (&new_message_header, sizeof(itti_dump_message_t), 1, dump_file);
197
        fwrite (message->data, message->data_size, 1, dump_file);
winckel's avatar
winckel committed
198
        fwrite (&itti_dump_message_type_end, sizeof(itti_message_types_t), 1, dump_file);
199 200 201
// #if !defined(RTAI)
        fflush (dump_file);
// #endif
202
        return (1);
203
    }
204
    return (0);
205 206
}

207 208 209
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
                                         const uint32_t message_definition_xml_length)
{
210 211 212 213 214
    itti_socket_header_t *itti_dump_message;
    /* Allocate memory for message header and payload */
    size_t itti_dump_message_size;
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
215

216 217
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid!\n", sd);
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL!\n");
218

winckel's avatar
winckel committed
219
    itti_dump_message_size = sizeof(itti_socket_header_t) + message_definition_xml_length + sizeof(itti_message_types_t);
220 221 222

    itti_dump_message = calloc(1, itti_dump_message_size);

223
    ITTI_DUMP_DEBUG(0x2, "[%d] Sending XML definition message of size %zu to observer peer\n",
224 225 226 227 228 229
               sd, itti_dump_message_size);

    itti_dump_message->message_size = itti_dump_message_size;
    itti_dump_message->message_type = ITTI_DUMP_XML_DEFINITION;

    /* Copying message definition */
winckel's avatar
winckel committed
230 231
    memcpy (&itti_dump_message[1], message_definition_xml, message_definition_xml_length);
    memcpy (((void *) &itti_dump_message[1]) + message_definition_xml_length, &itti_dump_xml_definition_end, sizeof(itti_message_types_t));
232 233 234 235 236 237

    data_ptr = (uint8_t *)&itti_dump_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0);
        if (bytes_sent < 0) {
238
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
239 240 241 242 243 244 245 246 247
                       sd, itti_dump_message_size, errno, strerror(errno));
            free(itti_dump_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != itti_dump_message_size);

    free(itti_dump_message);

248 249 250
    return 0;
}

251 252 253 254 255 256
static void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
    if (user_data != NULL)
    {
        itti_dump_queue_item_t *item;
        task_id_t task_id;
257
        int result;
258 259 260 261 262

        item = (itti_dump_queue_item_t *)user_data;

        if (item->data != NULL)
        {
263

264
            task_id = ITTI_MSG_ORIGIN_ID(item->data);
265 266
            result = itti_free(task_id, item->data);
            AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
267 268 269 270 271
        }
        else
        {
            task_id = TASK_UNKNOWN;
        }
272 273
        result = itti_free(task_id, item);
        AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
274 275 276
    }
}

277
static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size,
278
                                     uint32_t message_type)
Cedric Roux's avatar
Cedric Roux committed
279
{
280
    struct lfds611_freelist_element *new_queue_element = NULL;
281
    int overwrite_flag;
282
    AssertFatal (new != NULL, "Message to queue is NULL!\n");
Cedric Roux's avatar
Cedric Roux committed
283

284
#if defined(OAI_EMU) || defined(RTAI)
285 286 287
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

288 289
    new->message_type = message_type;
    new->message_size = message_size;
Cedric Roux's avatar
Cedric Roux committed
290

291 292 293 294 295 296
    ITTI_DUMP_DEBUG (0x1, " itti_dump_enqueue_message: lfds611_ringbuffer_get_write_element\n");
    new_queue_element = lfds611_ringbuffer_get_write_element (itti_dump_queue.itti_message_queue, &new_queue_element, &overwrite_flag);

    if (overwrite_flag != 0)
    {
        void *old = NULL;
Cedric Roux's avatar
Cedric Roux committed
297

298 299 300 301
        lfds611_freelist_get_user_data_from_element(new_queue_element, &old);
        ITTI_DUMP_DEBUG (0x4, " overwrite_flag set, freeing old data %p %p\n", new_queue_element, old);
        itti_dump_user_data_delete_function (old, NULL);
    }
Cedric Roux's avatar
Cedric Roux committed
302

303 304
    lfds611_freelist_set_user_data_in_element(new_queue_element, (void *) new);
    lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, new_queue_element);
Cedric Roux's avatar
Cedric Roux committed
305

306 307
    if (overwrite_flag == 0)
    {
308
#ifdef RTAI
309
        __sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1);
310
#else
311 312 313
        {
            ssize_t   write_ret;
            eventfd_t sem_counter = 1;
314

315 316
            /* Call to write for an event fd must be of 8 bytes */
            write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
317
            AssertFatal (write_ret == sizeof(sem_counter), "Write to dump event failed (%d/%d)!\n", (int) write_ret, (int) sizeof(sem_counter));
318
        }
319
#endif
320 321 322 323
        __sync_fetch_and_add (&pending_messages, 1);
    }

    ITTI_DUMP_DEBUG (0x2, " Added element to queue %p %p, pending %u, type %u\n", new_queue_element, new, pending_messages, message_type);
324

325
#if defined(OAI_EMU) || defined(RTAI)
326 327
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
#endif
Cedric Roux's avatar
Cedric Roux committed
328 329 330 331

    return 0;
}

332
static void itti_dump_socket_exit(void)
333
{
334 335
#ifndef RTAI
    close(itti_dump_queue.event_fd);
336
#endif
337
    close(itti_dump_queue.itti_listen_socket);
338

339 340
    /* Leave the thread as we detected end signal */
    pthread_exit(NULL);
341 342
}

343
static int itti_dump_flush_ring_buffer(int flush_all)
344 345
{
    struct lfds611_freelist_element *element = NULL;
346 347 348
    void   *user_data;
    int     j;
    int     consumer;
349 350 351

#ifdef RTAI
    unsigned long number_of_messages;
352
#endif
353

354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
    /* Check if there is a least one consumer */
    consumer = 0;
    if (dump_file != NULL)
    {
        consumer = 1;
    }
    else
    {
        for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
            if (itti_dump_queue.itti_clients[j].sd > 0) {
                consumer = 1;
                break;
            }
        }
    }
369

370 371 372 373
    if (consumer > 0)
    {
#ifdef RTAI
        number_of_messages = itti_dump_queue.messages_in_queue;
374

375 376 377 378 379
        ITTI_DUMP_DEBUG(0x4, "%lu elements in queue\n", number_of_messages);

        if (number_of_messages == 0) {
            return (consumer);
        }
380

381
        __sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages);
382 383
#endif

384 385 386
        do {
            /* Acquire the ring element */
            lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
387

388
            __sync_fetch_and_sub (&pending_messages, 1);
389

390 391 392 393 394 395 396 397
            if (element == NULL)
            {
                if (flush_all != 0)
                {
                    flush_all = 0;
                }
                else
                {
398
                    AssertFatal (0, "Dump event with no data!\n");
399 400 401 402 403 404
                }
            }
            else
            {
                /* Retrieve user part of the message */
                lfds611_freelist_get_user_data_from_element(element, &user_data);
405

406
                ITTI_DUMP_DEBUG (0x2, " removed element from queue %p %p, pending %u\n", element, user_data, pending_messages);
407

408 409 410 411 412
                if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
                {
                    lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
                    itti_dump_socket_exit();
                }
413

414 415
                /* Write message to file */
                itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
416

417 418 419 420 421 422 423
                /* Send message to remote analyzer */
                for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
                    if (itti_dump_queue.itti_clients[j].sd > 0) {
                        itti_dump_send_message(itti_dump_queue.itti_clients[j].sd,
                                                (itti_dump_queue_item_t *)user_data);
                    }
                }
424

425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
                itti_dump_user_data_delete_function (user_data, NULL);
                lfds611_freelist_set_user_data_in_element(element, NULL);

                /* We have finished with this element, reinsert it in the ring buffer */
                lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
            }
        } while(flush_all
    #ifdef RTAI
                && --number_of_messages
    #endif
                );
    }

    return (consumer);
}

static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
{
    if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
        uint8_t i;

        for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
            /* Let's find a place to store the new client */
            if (itti_dump_queue.itti_clients[i].sd == -1) {
                break;
450 451 452
            }
        }

453 454
        ITTI_DUMP_DEBUG(0x2, " Found place to store new connection: %d\n", i);

455
        AssertFatal (i < ITTI_DUMP_MAX_CON, "No more connection available (%d/%d) for socked %d!\n", i, ITTI_DUMP_MAX_CON, sd);
456 457 458 459 460

        ITTI_DUMP_DEBUG(0x2, " Socket %d accepted\n", sd);

        /* Send the XML message definition */
        if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
461
            AssertError (0, {}, "Failed to send XML definition!\n");
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
            close (sd);
            return -1;
        }

        itti_dump_queue.itti_clients[i].sd = sd;
        itti_dump_queue.nb_connected++;
    } else {
        ITTI_DUMP_DEBUG(0x2, " Socket %d rejected\n", sd);
        /* We have reached max number of users connected...
         * Reject the connection.
         */
        close (sd);
        return -1;
    }

    return 0;
478 479
}

480 481 482 483 484 485 486
static void *itti_dump_socket(void *arg_p)
{
    uint32_t message_definition_xml_length;
    char *message_definition_xml;
    int rc;
    int itti_listen_socket, max_sd;
    int on = 1;
487
    fd_set read_set, working_set;
488 489
    struct sockaddr_in servaddr; /* socket address structure */

490 491 492 493 494
    struct timeval *timeout_p = NULL;
#ifdef RTAI
    struct timeval  timeout;
#endif

495
    ITTI_DUMP_DEBUG(0x2, " Creating TCP dump socket on port %u\n", ITTI_PORT);
496 497

    message_definition_xml = (char *)arg_p;
498
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL!\n");
499 500 501 502

    message_definition_xml_length = strlen(message_definition_xml) + 1;

    if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
503
        ITTI_DUMP_ERROR(" ocket creation failed (%d:%s)\n", errno, strerror(errno));
504 505 506 507 508 509 510
        pthread_exit(NULL);
    }

    /* Allow socket reuse */
    rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR,
                    (char *)&on, sizeof(on));
    if (rc < 0) {
511
        ITTI_DUMP_ERROR(" setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
512 513 514 515 516 517 518 519 520
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    /* Set socket to be non-blocking.
     * NOTE: sockets accepted will inherit this option.
     */
    rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on);
    if (rc < 0) {
521
        ITTI_DUMP_ERROR(" ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
522 523 524 525 526 527 528 529 530 531
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family      = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port        = htons(ITTI_PORT);

    if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
532
             sizeof(servaddr)) < 0) {
533
        ITTI_DUMP_ERROR(" Bind failed (%d:%s)\n", errno, strerror(errno));
534 535
        pthread_exit(NULL);
    }
536
    if (listen(itti_listen_socket, 5) < 0) {
537
        ITTI_DUMP_ERROR(" Listen failed (%d:%s)\n", errno, strerror(errno));
538 539 540
        pthread_exit(NULL);
    }

541 542 543 544 545
    FD_ZERO(&read_set);

    /* Add the listener */
    FD_SET(itti_listen_socket, &read_set);

546
#ifndef RTAI
547 548 549 550 551
    /* Add the event fd */
    FD_SET(itti_dump_queue.event_fd, &read_set);

    /* Max of both sd */
    max_sd = itti_listen_socket > itti_dump_queue.event_fd ? itti_listen_socket : itti_dump_queue.event_fd;
552 553 554
#else
    max_sd = itti_listen_socket;
#endif
555 556

    itti_dump_queue.itti_listen_socket = itti_listen_socket;
557 558 559 560 561 562 563 564 565

    /* Loop waiting for incoming connects or for incoming data
     * on any of the connected sockets.
     */
    while (1) {
        int desc_ready;
        int client_socket = -1;
        int i;

566
        memcpy(&working_set, &read_set, sizeof(read_set));
567 568 569 570 571 572 573 574
#ifdef RTAI
        timeout.tv_sec  = 0;
        timeout.tv_usec = 100000;

        timeout_p = &timeout;
#else
        timeout_p = NULL;
#endif
575 576 577 578

        /* No timeout: select blocks till a new event has to be handled
         * on sd's.
         */
579
        rc = select(max_sd + 1, &working_set, NULL, NULL, timeout_p);
580 581

        if (rc < 0) {
582
            ITTI_DUMP_ERROR(" select failed (%d:%s)\n", errno, strerror(errno));
583
            pthread_exit(NULL);
584 585
        } else if (rc == 0) {
            /* Timeout */
586 587 588 589 590 591 592 593 594 595 596 597
            if (itti_dump_flush_ring_buffer(1) == 0)
            {
                if (itti_dump_running)
                {
                    ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                    usleep(100 * 1000);
                }
                else
                {
                    itti_dump_socket_exit();
                }
            }
598 599 600
        }

        desc_ready = rc;
601 602 603 604
        for (i = 0; i <= max_sd && desc_ready > 0; i++)
        {
            if (FD_ISSET(i, &working_set))
            {
605
                desc_ready -= 1;
606

607
#ifndef RTAI
608 609
                if (i == itti_dump_queue.event_fd) {
                    /* Notification of new element to dump from other tasks */
610 611
                    eventfd_t sem_counter;
                    ssize_t   read_ret;
612

613
                    /* Read will always return 1 for kernel versions > 2.6.30 */
614 615
                    read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
                    if (read_ret < 0) {
616
                        ITTI_DUMP_ERROR(" Failed read for semaphore: %s\n", strerror(errno));
617 618
                        pthread_exit(NULL);
                    }
619
                    AssertFatal (read_ret == sizeof(sem_counter), "Failed to read from dump event FD (%d/%d)!\n", (int) read_ret, (int) sizeof(sem_counter));
620 621 622 623 624 625 626 627 628 629 630 631 632
                    if (itti_dump_flush_ring_buffer(0) == 0)
                    {
                        if (itti_dump_running)
                        {
                            ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                            usleep(100 * 1000);
#ifndef RTAI
                            {
                                ssize_t   write_ret;

                                sem_counter = 1;
                                /* Call to write for an event fd must be of 8 bytes */
                                write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
633
                                AssertFatal (write_ret == sizeof(sem_counter), "Failed to write to dump event FD (%d/%d)!\n", (int) write_ret, (int) sem_counter);
634 635 636 637 638 639 640 641 642 643 644 645
                            }
#endif
                        }
                        else
                        {
                            itti_dump_socket_exit();
                        }
                    }
                    else
                    {
                        ITTI_DUMP_DEBUG(0x1, " Write element to file\n");
                    }
646 647 648
                } else
#endif
                if (i == itti_listen_socket) {
649 650 651 652 653
                    do {
                        client_socket = accept(itti_listen_socket, NULL, NULL);
                        if (client_socket < 0) {
                            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                                /* No more new connection */
654
                                ITTI_DUMP_DEBUG(0x2, " No more new connection\n");
655 656
                                continue;
                            } else {
657
                                ITTI_DUMP_ERROR(" accept failed (%d:%s)\n", errno, strerror(errno));
658 659 660 661
                                pthread_exit(NULL);
                            }
                        }
                        if (itti_dump_handle_new_connection(client_socket, message_definition_xml,
662 663
                            message_definition_xml_length) == 0)
                        {
664 665 666
                            /* The socket has been accepted.
                             * We have to update the set to include this new sd.
                             */
667
                            FD_SET(client_socket, &read_set);
668 669 670 671 672 673 674 675 676 677
                            if (client_socket > max_sd)
                                max_sd = client_socket;
                        }
                    } while(client_socket != -1);
                } else {
                    /* For now the MME itti dumper should not receive data
                     * other than connection oriented (CLOSE).
                     */
                    uint8_t j;

678
                    ITTI_DUMP_DEBUG(0x2, " Socket %d disconnected\n", i);
679 680 681 682 683

                    /* Close the socket and update info related to this connection */
                    close(i);

                    for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
684
                        if (itti_dump_queue.itti_clients[j].sd == i)
685 686 687 688 689 690
                            break;
                    }

                    /* In case we don't find the matching sd in list of known
                     * connections -> assert.
                     */
691
                    AssertFatal (j < ITTI_DUMP_MAX_CON, "Connection index not found (%d/%d) for socked %d!\n", j, ITTI_DUMP_MAX_CON, i);
692 693 694 695

                    /* Re-initialize the socket to -1 so we can accept new
                     * incoming connections.
                     */
696 697 698
                    itti_dump_queue.itti_clients[j].sd                  = -1;
                    itti_dump_queue.itti_clients[j].last_message_number = 0;
                    itti_dump_queue.nb_connected--;
699 700

                    /* Remove the socket from the FD set and update the max sd */
701
                    FD_CLR(i, &read_set);
702 703
                    if (i == max_sd)
                    {
704
                        if (itti_dump_queue.nb_connected == 0) {
705 706 707
                            /* No more new connection max_sd = itti_listen_socket */
                            max_sd = itti_listen_socket;
                        } else {
708
                            while (FD_ISSET(max_sd, &read_set) == 0) {
709 710 711 712 713 714 715 716 717 718 719
                                max_sd -= 1;
                            }
                        }
                    }
                }
            }
        }
    }
    return NULL;
}

720 721 722 723 724 725
/*------------------------------------------------------------------------------*/
int itti_dump_queue_message(task_id_t sender_task,
                            message_number_t message_number,
                            MessageDef *message_p,
                            const char *message_name,
                            const uint32_t message_size)
726
{
727 728 729
    if (itti_dump_running)
    {
        itti_dump_queue_item_t *new;
730

731 732
        AssertFatal (message_name != NULL, "Message name is NULL!\n");
        AssertFatal (message_p != NULL, "Message is NULL!\n");
733

734 735 736
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
737
        new = itti_malloc(sender_task, TASK_MAX, sizeof(itti_dump_queue_item_t));
738 739 740
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
741

742 743 744
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
745
        new->data = itti_malloc(sender_task, TASK_MAX, message_size);
746 747 748
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
749

750 751 752
        memcpy(new->data, message_p, message_size);
        new->data_size       = message_size;
        new->message_number  = message_number;
753

754
        itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
755 756 757 758 759
    }

    return 0;
}

760 761 762 763 764 765
/* This function should be called by each thread that will use the ring buffer */
void itti_dump_thread_use_ring_buffer(void)
{
    lfds611_ringbuffer_use(itti_dump_queue.itti_message_queue);
}

766
int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name)
767
{
768
    int i, ret;
769 770
    struct sched_param scheduler_param;

771
    scheduler_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
772

773 774
    if (dump_file_name != NULL)
    {
winckel's avatar
winckel committed
775
        dump_file = fopen(dump_file_name, "wb");
776 777 778

        if (dump_file == NULL)
        {
779
            ITTI_DUMP_ERROR(" can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
780 781 782
        }
        else
        {
783
            /* Output the XML to file */
winckel's avatar
winckel committed
784
            uint32_t message_size = strlen(messages_definition_xml) + 1;
785 786
            itti_socket_header_t header;

winckel's avatar
winckel committed
787
            header.message_size = sizeof(itti_socket_header_t) + message_size + sizeof(itti_message_types_t);
788 789 790 791
            header.message_type = ITTI_DUMP_XML_DEFINITION;

            fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file);
            fwrite (messages_definition_xml, message_size, 1, dump_file);
winckel's avatar
winckel committed
792
            fwrite (&itti_dump_xml_definition_end, sizeof(itti_message_types_t), 1, dump_file);
793
            fflush (dump_file);
794 795 796
        }
    }

797
    memset(&itti_dump_queue, 0, sizeof(itti_desc_t));
798

799
    ITTI_DUMP_DEBUG(0x2, " Creating new ring buffer for itti dump of %u elements\n",
800 801 802 803 804 805 806 807
                    ITTI_QUEUE_MAX_ELEMENTS);

    if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue,
                               ITTI_QUEUE_MAX_ELEMENTS,
                               NULL,
                               NULL) != 1)
    {
        /* Always assert on this condition */
808
        AssertFatal (0, " Failed to create ring buffer!\n");
809 810
    }

811 812 813
#ifdef RTAI
    itti_dump_queue.messages_in_queue = 0;
#else
814
    itti_dump_queue.event_fd = eventfd(0, EFD_SEMAPHORE);
815
    if (itti_dump_queue.event_fd == -1) {
816
        /* Always assert on this condition */
817
        AssertFatal (0, "eventfd failed: %s!\n", strerror(errno));
818
    }
819
#endif
820 821

    itti_dump_queue.nb_connected = 0;
822 823

    for(i = 0; i < ITTI_DUMP_MAX_CON; i++) {
824 825 826 827 828 829 830
        itti_dump_queue.itti_clients[i].sd = -1;
        itti_dump_queue.itti_clients[i].last_message_number = 0;
    }

    /* initialized with default attributes */
    ret = pthread_attr_init(&itti_dump_queue.attr);
    if (ret < 0) {
831
        AssertFatal (0, "pthread_attr_init failed (%d:%s)!\n", errno, strerror(errno));
832
    }
833

834
    ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_FIFO);
835
    if (ret < 0) {
836
        AssertFatal (0, "pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)!\n", errno, strerror(errno));
837 838 839
    }
    ret = pthread_attr_setschedparam(&itti_dump_queue.attr, &scheduler_param);
    if (ret < 0) {
840
        AssertFatal (0, "pthread_attr_setschedparam failed (%d:%s)!\n", errno, strerror(errno));
841
    }
842 843 844 845

    ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr,
                         &itti_dump_socket, (void *)messages_definition_xml);
    if (ret < 0) {
846
        AssertFatal (0, "pthread_create failed (%d:%s)!\n", errno, strerror(errno));
847
    }
848

849 850
    return 0;
}
851 852 853

void itti_dump_exit(void)
{
854
    void *arg;
winckel's avatar
winckel committed
855 856
    itti_dump_queue_item_t *new;

857 858
    new = itti_malloc(TASK_UNKNOWN, TASK_UNKNOWN, sizeof(itti_dump_queue_item_t));
    memset(new, 0, sizeof(itti_dump_queue_item_t));
859

860 861 862
    /* Set a flag to stop recording message */
    itti_dump_running = 0;

863
    /* Send the exit signal to other thread */
winckel's avatar
winckel committed
864
    itti_dump_enqueue_message(new, 0, ITTI_DUMP_EXIT_SIGNAL);
865

866
    ITTI_DUMP_DEBUG(0x2, " waiting for dumper thread to finish\n");
867 868 869 870

    /* wait for the thread to terminate */
    pthread_join(itti_dump_queue.itti_acceptor_thread, &arg);

871
    ITTI_DUMP_DEBUG(0x2, " dumper thread correctly exited\n");
872

873 874
    if (dump_file != NULL)
    {
875
        /* Synchronise file and then close it */
876
        fclose(dump_file);
877
        dump_file = NULL;
878 879
    }

880 881 882 883 884 885
    if (itti_dump_queue.itti_message_queue)
    {
        lfds611_ringbuffer_delete(itti_dump_queue.itti_message_queue,
                                  itti_dump_user_data_delete_function, NULL);
    }
}