intertask_interface_dump.c 30.8 KB
Newer Older
1
/*******************************************************************************
ghaddab's avatar
ghaddab committed
2 3
    OpenAirInterface 
    Copyright(c) 1999 - 2014 Eurecom
4

ghaddab's avatar
ghaddab committed
5 6 7 8
    OpenAirInterface is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
9 10


ghaddab's avatar
ghaddab committed
11 12 13 14
    OpenAirInterface is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
15

ghaddab's avatar
ghaddab committed
16 17 18 19
    You should have received a copy of the GNU General Public License
    along with OpenAirInterface.The full GNU General Public License is 
   included in this distribution in the file called "COPYING". If not, 
   see <http://www.gnu.org/licenses/>.
20 21

  Contact Information
ghaddab's avatar
ghaddab committed
22 23 24 25 26 27 28
  OpenAirInterface Admin: openair_admin@eurecom.fr
  OpenAirInterface Tech : openair_tech@eurecom.fr
  OpenAirInterface Dev  : openair4g-devel@eurecom.fr
  
  Address      : Eurecom, Compus SophiaTech 450, route des chappes, 06451 Biot, France.

 *******************************************************************************/
29 30 31


/** @brief Intertask Interface Signal Dumper
32
 * Allows users to connect their itti_analyzer to this process and dump
33 34 35 36 37 38 39 40 41 42 43 44
 * signals exchanged between tasks.
 * @author Sebastien Roux <sebastien.roux@eurecom.fr>
 */

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <error.h>
45
#include <sched.h>
46 47 48 49 50 51 52

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <arpa/inet.h>

53 54
#include <sys/eventfd.h>

55
#include "assertions.h"
56
#include "liblfds611.h"
57

winckel's avatar
winckel committed
58
#include "itti_types.h"
59 60 61
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

62
#if defined(OAI_EMU) || defined(RTAI)
63 64 65
#include "vcd_signal_dumper.h"
#endif

66
static const int itti_dump_debug = 0; // 0x8 | 0x4 | 0x2;
67

68
#ifdef RTAI
69
# define ITTI_DUMP_DEBUG(m, x, args...) do { if ((m) & itti_dump_debug) rt_printk("[ITTI_DUMP][D]"x, ##args); } \
70 71
    while(0)
#else
72
# define ITTI_DUMP_DEBUG(m, x, args...) do { if ((m) & itti_dump_debug) fprintf(stdout, "[ITTI_DUMP][D]"x, ##args); } \
73
    while(0)
74
#endif
75 76
#define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI_DUMP][E]"x, ##args); } \
    while(0)
77

78 79 80 81
#ifndef EFD_SEMAPHORE
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

82
typedef struct itti_dump_queue_item_s {
83 84 85 86 87
    MessageDef *data;
    uint32_t    data_size;
    uint32_t    message_number;
    uint32_t    message_type;
    uint32_t    message_size;
88
} itti_dump_queue_item_t;
89 90 91 92 93 94 95

typedef struct {
    int      sd;
    uint32_t last_message_number;
} itti_client_desc_t;

typedef struct itti_desc_s {
96 97 98
    /* Asynchronous thread that write to file/accept new clients */
    pthread_t      itti_acceptor_thread;
    pthread_attr_t attr;
99 100 101

    /* List of messages to dump.
     * NOTE: we limit the size of this queue to retain only the last exchanged
102
     * messages. The size can be increased by setting up the ITTI_QUEUE_MAX_ELEMENTS
103 104
     * in mme_default_values.h or by putting a custom in the configuration file.
     */
105 106
    struct lfds611_ringbuffer_state *itti_message_queue;

107 108
    int nb_connected;

109
#ifndef RTAI
110 111
    /* Event fd used to notify new messages (semaphore) */
    int event_fd;
112 113 114
#else
    unsigned long messages_in_queue __attribute__((aligned(8)));
#endif
115 116 117

    int itti_listen_socket;

118 119 120 121
    itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON];
} itti_desc_t;

typedef struct {
winckel's avatar
winckel committed
122
    itti_socket_header_t socket_header;
123

winckel's avatar
winckel committed
124
    itti_signal_header_t signal_header;
125 126 127 128 129

    /* Message payload is added here, this struct is used as an header */
} itti_dump_message_t;

typedef struct {
winckel's avatar
winckel committed
130
    itti_socket_header_t socket_header;
131 132
} itti_statistic_message_t;

winckel's avatar
winckel committed
133 134 135
static const itti_message_types_t itti_dump_xml_definition_end =  ITTI_DUMP_XML_DEFINITION_END;
static const itti_message_types_t itti_dump_message_type_end =    ITTI_DUMP_MESSAGE_TYPE_END;

136
static itti_desc_t itti_dump_queue;
137
static FILE *dump_file = NULL;
138
static int itti_dump_running = 1;
139

140
static volatile uint32_t pending_messages = 0;
141

142 143
/*------------------------------------------------------------------------------*/
static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
144 145
{
    itti_dump_message_t *new_message;
146 147
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
148 149

    /* Allocate memory for message header and payload */
winckel's avatar
winckel committed
150
    size_t size = sizeof(itti_dump_message_t) + message->data_size + sizeof(itti_message_types_t);
151

152 153
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid!\n", sd);
    AssertFatal (message != NULL, "Message is NULL!\n");
154

winckel's avatar
winckel committed
155
    new_message = malloc(size);
156
    AssertFatal (new_message != NULL, "New message allocation failed!\n");
157 158

    /* Preparing the header */
winckel's avatar
winckel committed
159 160
    new_message->socket_header.message_size = size;
    new_message->socket_header.message_type = ITTI_DUMP_MESSAGE_TYPE;
161 162 163 164
    /* Adds message number in unsigned decimal ASCII format */
    snprintf(new_message->signal_header.message_number_char, sizeof(new_message->signal_header.message_number_char),
             MESSAGE_NUMBER_CHAR_FORMAT, message->message_number);
    new_message->signal_header.message_number_char[sizeof(new_message->signal_header.message_number_char) - 1] = '\n';
165 166 167
    /* Appends message payload */
    memcpy(&new_message[1], message->data, message->data_size);

winckel's avatar
winckel committed
168 169
    memcpy(((void *) &new_message[1]) + message->data_size, &itti_dump_message_type_end, sizeof(itti_message_types_t));

170 171 172 173 174
    data_ptr = (uint8_t *)&new_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0);
        if (bytes_sent < 0) {
175
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
176 177 178 179 180 181
                       sd, size, errno, strerror(errno));
            free(new_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != size);
182 183

    free(new_message);
184
    return total_sent;
185 186
}

187
static int itti_dump_fwrite_message(itti_dump_queue_item_t *message)
188
{
189
    itti_dump_message_t  new_message_header;
190

191
    if ((dump_file != NULL) && (message != NULL)) {
192

193 194 195 196 197
        new_message_header.socket_header.message_size = message->message_size + sizeof(itti_dump_message_t) + sizeof(itti_message_types_t);
        new_message_header.socket_header.message_type = message->message_type;
        snprintf(new_message_header.signal_header.message_number_char, sizeof(new_message_header.signal_header.message_number_char),
                 MESSAGE_NUMBER_CHAR_FORMAT, message->message_number);
        new_message_header.signal_header.message_number_char[sizeof(new_message_header.signal_header.message_number_char) - 1] = '\n';
198

199
        fwrite (&new_message_header, sizeof(itti_dump_message_t), 1, dump_file);
200
        fwrite (message->data, message->data_size, 1, dump_file);
winckel's avatar
winckel committed
201
        fwrite (&itti_dump_message_type_end, sizeof(itti_message_types_t), 1, dump_file);
202 203 204
// #if !defined(RTAI)
        fflush (dump_file);
// #endif
205
        return (1);
206
    }
207
    return (0);
208 209
}

210 211 212
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
                                         const uint32_t message_definition_xml_length)
{
213 214 215 216 217
    itti_socket_header_t *itti_dump_message;
    /* Allocate memory for message header and payload */
    size_t itti_dump_message_size;
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
218

219 220
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid!\n", sd);
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL!\n");
221

winckel's avatar
winckel committed
222
    itti_dump_message_size = sizeof(itti_socket_header_t) + message_definition_xml_length + sizeof(itti_message_types_t);
223 224 225

    itti_dump_message = calloc(1, itti_dump_message_size);

226
    ITTI_DUMP_DEBUG(0x2, "[%d] Sending XML definition message of size %zu to observer peer\n",
227 228 229 230 231 232
               sd, itti_dump_message_size);

    itti_dump_message->message_size = itti_dump_message_size;
    itti_dump_message->message_type = ITTI_DUMP_XML_DEFINITION;

    /* Copying message definition */
winckel's avatar
winckel committed
233 234
    memcpy (&itti_dump_message[1], message_definition_xml, message_definition_xml_length);
    memcpy (((void *) &itti_dump_message[1]) + message_definition_xml_length, &itti_dump_xml_definition_end, sizeof(itti_message_types_t));
235 236 237 238 239 240

    data_ptr = (uint8_t *)&itti_dump_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0);
        if (bytes_sent < 0) {
241
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
242 243 244 245 246 247 248 249 250
                       sd, itti_dump_message_size, errno, strerror(errno));
            free(itti_dump_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != itti_dump_message_size);

    free(itti_dump_message);

251 252 253
    return 0;
}

254 255 256 257 258 259
static void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
    if (user_data != NULL)
    {
        itti_dump_queue_item_t *item;
        task_id_t task_id;
260
        int result;
261 262 263 264 265

        item = (itti_dump_queue_item_t *)user_data;

        if (item->data != NULL)
        {
266

267
            task_id = ITTI_MSG_ORIGIN_ID(item->data);
268 269
            result = itti_free(task_id, item->data);
            AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
270 271 272 273 274
        }
        else
        {
            task_id = TASK_UNKNOWN;
        }
275 276
        result = itti_free(task_id, item);
        AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
277 278 279
    }
}

280
static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size,
281
                                     uint32_t message_type)
Cedric Roux's avatar
Cedric Roux committed
282
{
283
    struct lfds611_freelist_element *new_queue_element = NULL;
284
    int overwrite_flag;
285
    AssertFatal (new != NULL, "Message to queue is NULL!\n");
Cedric Roux's avatar
Cedric Roux committed
286

287
#if defined(OAI_EMU) || defined(RTAI)
288 289 290
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

291 292
    new->message_type = message_type;
    new->message_size = message_size;
Cedric Roux's avatar
Cedric Roux committed
293

294 295 296 297 298 299
    ITTI_DUMP_DEBUG (0x1, " itti_dump_enqueue_message: lfds611_ringbuffer_get_write_element\n");
    new_queue_element = lfds611_ringbuffer_get_write_element (itti_dump_queue.itti_message_queue, &new_queue_element, &overwrite_flag);

    if (overwrite_flag != 0)
    {
        void *old = NULL;
Cedric Roux's avatar
Cedric Roux committed
300

301 302 303 304
        lfds611_freelist_get_user_data_from_element(new_queue_element, &old);
        ITTI_DUMP_DEBUG (0x4, " overwrite_flag set, freeing old data %p %p\n", new_queue_element, old);
        itti_dump_user_data_delete_function (old, NULL);
    }
Cedric Roux's avatar
Cedric Roux committed
305

306 307
    lfds611_freelist_set_user_data_in_element(new_queue_element, (void *) new);
    lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, new_queue_element);
Cedric Roux's avatar
Cedric Roux committed
308

309 310
    if (overwrite_flag == 0)
    {
311
#ifdef RTAI
312
        __sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1);
313
#else
314 315 316
        {
            ssize_t   write_ret;
            eventfd_t sem_counter = 1;
317

318 319
            /* Call to write for an event fd must be of 8 bytes */
            write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
320
            AssertFatal (write_ret == sizeof(sem_counter), "Write to dump event failed (%d/%d)!\n", (int) write_ret, (int) sizeof(sem_counter));
321
        }
322
#endif
323 324 325 326
        __sync_fetch_and_add (&pending_messages, 1);
    }

    ITTI_DUMP_DEBUG (0x2, " Added element to queue %p %p, pending %u, type %u\n", new_queue_element, new, pending_messages, message_type);
327

328
#if defined(OAI_EMU) || defined(RTAI)
329 330
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
#endif
Cedric Roux's avatar
Cedric Roux committed
331 332 333 334

    return 0;
}

335
static void itti_dump_socket_exit(void)
336
{
337 338
#ifndef RTAI
    close(itti_dump_queue.event_fd);
339
#endif
340
    close(itti_dump_queue.itti_listen_socket);
341

342 343
    /* Leave the thread as we detected end signal */
    pthread_exit(NULL);
344 345
}

346
static int itti_dump_flush_ring_buffer(int flush_all)
347 348
{
    struct lfds611_freelist_element *element = NULL;
349 350 351
    void   *user_data;
    int     j;
    int     consumer;
352 353 354

#ifdef RTAI
    unsigned long number_of_messages;
355
#endif
356

357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    /* Check if there is a least one consumer */
    consumer = 0;
    if (dump_file != NULL)
    {
        consumer = 1;
    }
    else
    {
        for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
            if (itti_dump_queue.itti_clients[j].sd > 0) {
                consumer = 1;
                break;
            }
        }
    }
372

373 374 375 376
    if (consumer > 0)
    {
#ifdef RTAI
        number_of_messages = itti_dump_queue.messages_in_queue;
377

378 379 380 381 382
        ITTI_DUMP_DEBUG(0x4, "%lu elements in queue\n", number_of_messages);

        if (number_of_messages == 0) {
            return (consumer);
        }
383

384
        __sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages);
385 386
#endif

387 388 389
        do {
            /* Acquire the ring element */
            lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
390

391
            __sync_fetch_and_sub (&pending_messages, 1);
392

393 394 395 396 397 398 399 400
            if (element == NULL)
            {
                if (flush_all != 0)
                {
                    flush_all = 0;
                }
                else
                {
401
                    AssertFatal (0, "Dump event with no data!\n");
402 403 404 405 406 407
                }
            }
            else
            {
                /* Retrieve user part of the message */
                lfds611_freelist_get_user_data_from_element(element, &user_data);
408

409
                ITTI_DUMP_DEBUG (0x2, " removed element from queue %p %p, pending %u\n", element, user_data, pending_messages);
410

411 412 413 414 415
                if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
                {
                    lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
                    itti_dump_socket_exit();
                }
416

417 418
                /* Write message to file */
                itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
419

420 421 422 423 424 425 426
                /* Send message to remote analyzer */
                for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
                    if (itti_dump_queue.itti_clients[j].sd > 0) {
                        itti_dump_send_message(itti_dump_queue.itti_clients[j].sd,
                                                (itti_dump_queue_item_t *)user_data);
                    }
                }
427

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
                itti_dump_user_data_delete_function (user_data, NULL);
                lfds611_freelist_set_user_data_in_element(element, NULL);

                /* We have finished with this element, reinsert it in the ring buffer */
                lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
            }
        } while(flush_all
    #ifdef RTAI
                && --number_of_messages
    #endif
                );
    }

    return (consumer);
}

static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
{
    if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
        uint8_t i;

        for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
            /* Let's find a place to store the new client */
            if (itti_dump_queue.itti_clients[i].sd == -1) {
                break;
453 454 455
            }
        }

456 457
        ITTI_DUMP_DEBUG(0x2, " Found place to store new connection: %d\n", i);

458
        AssertFatal (i < ITTI_DUMP_MAX_CON, "No more connection available (%d/%d) for socked %d!\n", i, ITTI_DUMP_MAX_CON, sd);
459 460 461 462 463

        ITTI_DUMP_DEBUG(0x2, " Socket %d accepted\n", sd);

        /* Send the XML message definition */
        if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
464
            AssertError (0, {}, "Failed to send XML definition!\n");
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
            close (sd);
            return -1;
        }

        itti_dump_queue.itti_clients[i].sd = sd;
        itti_dump_queue.nb_connected++;
    } else {
        ITTI_DUMP_DEBUG(0x2, " Socket %d rejected\n", sd);
        /* We have reached max number of users connected...
         * Reject the connection.
         */
        close (sd);
        return -1;
    }

    return 0;
481 482
}

483 484 485 486 487 488 489
static void *itti_dump_socket(void *arg_p)
{
    uint32_t message_definition_xml_length;
    char *message_definition_xml;
    int rc;
    int itti_listen_socket, max_sd;
    int on = 1;
490
    fd_set read_set, working_set;
491 492
    struct sockaddr_in servaddr; /* socket address structure */

493 494 495 496 497
    struct timeval *timeout_p = NULL;
#ifdef RTAI
    struct timeval  timeout;
#endif

498
    ITTI_DUMP_DEBUG(0x2, " Creating TCP dump socket on port %u\n", ITTI_PORT);
499 500

    message_definition_xml = (char *)arg_p;
501
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL!\n");
502 503 504 505

    message_definition_xml_length = strlen(message_definition_xml) + 1;

    if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
506
        ITTI_DUMP_ERROR(" ocket creation failed (%d:%s)\n", errno, strerror(errno));
507 508 509 510 511 512 513
        pthread_exit(NULL);
    }

    /* Allow socket reuse */
    rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR,
                    (char *)&on, sizeof(on));
    if (rc < 0) {
514
        ITTI_DUMP_ERROR(" setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
515 516 517 518 519 520 521 522 523
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    /* Set socket to be non-blocking.
     * NOTE: sockets accepted will inherit this option.
     */
    rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on);
    if (rc < 0) {
524
        ITTI_DUMP_ERROR(" ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
525 526 527 528 529 530 531 532 533 534
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family      = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port        = htons(ITTI_PORT);

    if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
535
             sizeof(servaddr)) < 0) {
536
        ITTI_DUMP_ERROR(" Bind failed (%d:%s)\n", errno, strerror(errno));
537 538
        pthread_exit(NULL);
    }
539
    if (listen(itti_listen_socket, 5) < 0) {
540
        ITTI_DUMP_ERROR(" Listen failed (%d:%s)\n", errno, strerror(errno));
541 542 543
        pthread_exit(NULL);
    }

544 545 546 547 548
    FD_ZERO(&read_set);

    /* Add the listener */
    FD_SET(itti_listen_socket, &read_set);

549
#ifndef RTAI
550 551 552 553 554
    /* Add the event fd */
    FD_SET(itti_dump_queue.event_fd, &read_set);

    /* Max of both sd */
    max_sd = itti_listen_socket > itti_dump_queue.event_fd ? itti_listen_socket : itti_dump_queue.event_fd;
555 556 557
#else
    max_sd = itti_listen_socket;
#endif
558 559

    itti_dump_queue.itti_listen_socket = itti_listen_socket;
560 561 562 563 564 565 566 567 568

    /* Loop waiting for incoming connects or for incoming data
     * on any of the connected sockets.
     */
    while (1) {
        int desc_ready;
        int client_socket = -1;
        int i;

569
        memcpy(&working_set, &read_set, sizeof(read_set));
570 571 572 573 574 575 576 577
#ifdef RTAI
        timeout.tv_sec  = 0;
        timeout.tv_usec = 100000;

        timeout_p = &timeout;
#else
        timeout_p = NULL;
#endif
578 579 580 581

        /* No timeout: select blocks till a new event has to be handled
         * on sd's.
         */
582
        rc = select(max_sd + 1, &working_set, NULL, NULL, timeout_p);
583 584

        if (rc < 0) {
585
            ITTI_DUMP_ERROR(" select failed (%d:%s)\n", errno, strerror(errno));
586
            pthread_exit(NULL);
587 588
        } else if (rc == 0) {
            /* Timeout */
589 590 591 592 593 594 595 596 597 598 599 600
            if (itti_dump_flush_ring_buffer(1) == 0)
            {
                if (itti_dump_running)
                {
                    ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                    usleep(100 * 1000);
                }
                else
                {
                    itti_dump_socket_exit();
                }
            }
601 602 603
        }

        desc_ready = rc;
604 605 606 607
        for (i = 0; i <= max_sd && desc_ready > 0; i++)
        {
            if (FD_ISSET(i, &working_set))
            {
608
                desc_ready -= 1;
609

610
#ifndef RTAI
611 612
                if (i == itti_dump_queue.event_fd) {
                    /* Notification of new element to dump from other tasks */
613 614
                    eventfd_t sem_counter;
                    ssize_t   read_ret;
615

616
                    /* Read will always return 1 for kernel versions > 2.6.30 */
617 618
                    read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
                    if (read_ret < 0) {
619
                        ITTI_DUMP_ERROR(" Failed read for semaphore: %s\n", strerror(errno));
620 621
                        pthread_exit(NULL);
                    }
622
                    AssertFatal (read_ret == sizeof(sem_counter), "Failed to read from dump event FD (%d/%d)!\n", (int) read_ret, (int) sizeof(sem_counter));
623
#if defined(KERNEL_VERSION_PRE_2_6_30)
624
                    if (itti_dump_flush_ring_buffer(1) == 0)
625
#else
626
                    if (itti_dump_flush_ring_buffer(0) == 0)
627
#endif
628 629 630 631 632 633 634 635 636 637 638 639
                    {
                        if (itti_dump_running)
                        {
                            ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                            usleep(100 * 1000);
#ifndef RTAI
                            {
                                ssize_t   write_ret;

                                sem_counter = 1;
                                /* Call to write for an event fd must be of 8 bytes */
                                write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
640
                                AssertFatal (write_ret == sizeof(sem_counter), "Failed to write to dump event FD (%d/%d)!\n", (int) write_ret, (int) sem_counter);
641 642 643 644 645 646 647 648 649 650 651 652
                            }
#endif
                        }
                        else
                        {
                            itti_dump_socket_exit();
                        }
                    }
                    else
                    {
                        ITTI_DUMP_DEBUG(0x1, " Write element to file\n");
                    }
653 654 655
                } else
#endif
                if (i == itti_listen_socket) {
656 657 658 659 660
                    do {
                        client_socket = accept(itti_listen_socket, NULL, NULL);
                        if (client_socket < 0) {
                            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                                /* No more new connection */
661
                                ITTI_DUMP_DEBUG(0x2, " No more new connection\n");
662 663
                                continue;
                            } else {
664
                                ITTI_DUMP_ERROR(" accept failed (%d:%s)\n", errno, strerror(errno));
665 666 667 668
                                pthread_exit(NULL);
                            }
                        }
                        if (itti_dump_handle_new_connection(client_socket, message_definition_xml,
669 670
                            message_definition_xml_length) == 0)
                        {
671 672 673
                            /* The socket has been accepted.
                             * We have to update the set to include this new sd.
                             */
674
                            FD_SET(client_socket, &read_set);
675 676 677 678 679 680 681 682 683 684
                            if (client_socket > max_sd)
                                max_sd = client_socket;
                        }
                    } while(client_socket != -1);
                } else {
                    /* For now the MME itti dumper should not receive data
                     * other than connection oriented (CLOSE).
                     */
                    uint8_t j;

685
                    ITTI_DUMP_DEBUG(0x2, " Socket %d disconnected\n", i);
686 687 688 689 690

                    /* Close the socket and update info related to this connection */
                    close(i);

                    for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
691
                        if (itti_dump_queue.itti_clients[j].sd == i)
692 693 694 695 696 697
                            break;
                    }

                    /* In case we don't find the matching sd in list of known
                     * connections -> assert.
                     */
698
                    AssertFatal (j < ITTI_DUMP_MAX_CON, "Connection index not found (%d/%d) for socked %d!\n", j, ITTI_DUMP_MAX_CON, i);
699 700 701 702

                    /* Re-initialize the socket to -1 so we can accept new
                     * incoming connections.
                     */
703 704 705
                    itti_dump_queue.itti_clients[j].sd                  = -1;
                    itti_dump_queue.itti_clients[j].last_message_number = 0;
                    itti_dump_queue.nb_connected--;
706 707

                    /* Remove the socket from the FD set and update the max sd */
708
                    FD_CLR(i, &read_set);
709 710
                    if (i == max_sd)
                    {
711
                        if (itti_dump_queue.nb_connected == 0) {
712 713 714
                            /* No more new connection max_sd = itti_listen_socket */
                            max_sd = itti_listen_socket;
                        } else {
715
                            while (FD_ISSET(max_sd, &read_set) == 0) {
716 717 718 719 720 721 722 723 724 725 726
                                max_sd -= 1;
                            }
                        }
                    }
                }
            }
        }
    }
    return NULL;
}

727 728 729 730 731 732
/*------------------------------------------------------------------------------*/
int itti_dump_queue_message(task_id_t sender_task,
                            message_number_t message_number,
                            MessageDef *message_p,
                            const char *message_name,
                            const uint32_t message_size)
733
{
734 735 736
    if (itti_dump_running)
    {
        itti_dump_queue_item_t *new;
737

738 739
        AssertFatal (message_name != NULL, "Message name is NULL!\n");
        AssertFatal (message_p != NULL, "Message is NULL!\n");
740

741 742 743
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
744
        new = itti_malloc(sender_task, TASK_MAX, sizeof(itti_dump_queue_item_t));
745 746 747
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
748

749 750 751
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
752
        new->data = itti_malloc(sender_task, TASK_MAX, message_size);
753 754 755
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
756

757 758 759
        memcpy(new->data, message_p, message_size);
        new->data_size       = message_size;
        new->message_number  = message_number;
760

761
        itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
762 763 764 765 766
    }

    return 0;
}

767 768 769 770 771 772
/* This function should be called by each thread that will use the ring buffer */
void itti_dump_thread_use_ring_buffer(void)
{
    lfds611_ringbuffer_use(itti_dump_queue.itti_message_queue);
}

773
int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name)
774
{
775
    int i, ret;
776 777
    struct sched_param scheduler_param;

778
    scheduler_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
779

780 781
    if (dump_file_name != NULL)
    {
winckel's avatar
winckel committed
782
        dump_file = fopen(dump_file_name, "wb");
783 784 785

        if (dump_file == NULL)
        {
786
            ITTI_DUMP_ERROR(" can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
787 788 789
        }
        else
        {
790
            /* Output the XML to file */
winckel's avatar
winckel committed
791
            uint32_t message_size = strlen(messages_definition_xml) + 1;
792 793
            itti_socket_header_t header;

winckel's avatar
winckel committed
794
            header.message_size = sizeof(itti_socket_header_t) + message_size + sizeof(itti_message_types_t);
795 796 797 798
            header.message_type = ITTI_DUMP_XML_DEFINITION;

            fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file);
            fwrite (messages_definition_xml, message_size, 1, dump_file);
winckel's avatar
winckel committed
799
            fwrite (&itti_dump_xml_definition_end, sizeof(itti_message_types_t), 1, dump_file);
800
            fflush (dump_file);
801 802 803
        }
    }

804
    memset(&itti_dump_queue, 0, sizeof(itti_desc_t));
805

806
    ITTI_DUMP_DEBUG(0x2, " Creating new ring buffer for itti dump of %u elements\n",
807 808 809 810 811 812 813 814
                    ITTI_QUEUE_MAX_ELEMENTS);

    if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue,
                               ITTI_QUEUE_MAX_ELEMENTS,
                               NULL,
                               NULL) != 1)
    {
        /* Always assert on this condition */
815
        AssertFatal (0, " Failed to create ring buffer!\n");
816 817
    }

818 819 820
#ifdef RTAI
    itti_dump_queue.messages_in_queue = 0;
#else
821 822 823
# if defined(KERNEL_VERSION_PRE_2_6_30)
    itti_dump_queue.event_fd = eventfd(0, 0);
# else
824
    itti_dump_queue.event_fd = eventfd(0, EFD_SEMAPHORE);
825
# endif
826
    if (itti_dump_queue.event_fd == -1) {
827
        /* Always assert on this condition */
828
        AssertFatal (0, "eventfd failed: %s!\n", strerror(errno));
829
    }
830
#endif
831 832

    itti_dump_queue.nb_connected = 0;
833 834

    for(i = 0; i < ITTI_DUMP_MAX_CON; i++) {
835 836 837 838 839 840 841
        itti_dump_queue.itti_clients[i].sd = -1;
        itti_dump_queue.itti_clients[i].last_message_number = 0;
    }

    /* initialized with default attributes */
    ret = pthread_attr_init(&itti_dump_queue.attr);
    if (ret < 0) {
842
        AssertFatal (0, "pthread_attr_init failed (%d:%s)!\n", errno, strerror(errno));
843
    }
844

845
    ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_FIFO);
846
    if (ret < 0) {
847
        AssertFatal (0, "pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)!\n", errno, strerror(errno));
848 849 850
    }
    ret = pthread_attr_setschedparam(&itti_dump_queue.attr, &scheduler_param);
    if (ret < 0) {
851
        AssertFatal (0, "pthread_attr_setschedparam failed (%d:%s)!\n", errno, strerror(errno));
852
    }
853 854 855 856

    ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr,
                         &itti_dump_socket, (void *)messages_definition_xml);
    if (ret < 0) {
857
        AssertFatal (0, "pthread_create failed (%d:%s)!\n", errno, strerror(errno));
858
    }
859

860 861
    return 0;
}
862 863 864

void itti_dump_exit(void)
{
865
    void *arg;
winckel's avatar
winckel committed
866 867
    itti_dump_queue_item_t *new;

868 869
    new = itti_malloc(TASK_UNKNOWN, TASK_UNKNOWN, sizeof(itti_dump_queue_item_t));
    memset(new, 0, sizeof(itti_dump_queue_item_t));
870

871 872 873
    /* Set a flag to stop recording message */
    itti_dump_running = 0;

874
    /* Send the exit signal to other thread */
winckel's avatar
winckel committed
875
    itti_dump_enqueue_message(new, 0, ITTI_DUMP_EXIT_SIGNAL);
876

877
    ITTI_DUMP_DEBUG(0x2, " waiting for dumper thread to finish\n");
878 879 880 881

    /* wait for the thread to terminate */
    pthread_join(itti_dump_queue.itti_acceptor_thread, &arg);

882
    ITTI_DUMP_DEBUG(0x2, " dumper thread correctly exited\n");
883

884 885
    if (dump_file != NULL)
    {
886
        /* Synchronise file and then close it */
887
        fclose(dump_file);
888
        dump_file = NULL;
889 890
    }

891 892 893 894 895 896
    if (itti_dump_queue.itti_message_queue)
    {
        lfds611_ringbuffer_delete(itti_dump_queue.itti_message_queue,
                                  itti_dump_user_data_delete_function, NULL);
    }
}