intertask_interface.c 27.3 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30

31
#define _GNU_SOURCE
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
39

40 41 42 43
#ifdef RTAI
# include <rtai_fifos.h>
#endif

44 45
#include "assertions.h"

46 47 48
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
49

50 51
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
52

53 54 55 56
#if defined(OAI_EMU) || defined(RTAI)
# include "vcd_signal_dumper.h"
#endif

57 58 59
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
60 61 62 63
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

64
#include "signals.h"
65 66
#include "timer.h"

67 68
const int itti_debug = 0;
const int itti_debug_poll = 0;
69

70 71 72 73 74 75 76 77
/* Don't flush if using RTAI */
#ifdef RTAI
# define ITTI_DEBUG(x, args...) do { if (itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
# define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
78
    while(0)
79
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
80
    while(0)
81
#endif
82 83 84 85 86

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
87
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
88 89 90
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
91
typedef struct message_list_s {
92
    MessageDef *msg; ///< Pointer to the message
93

94 95
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
96
} message_list_t;
97

98 99 100
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
101

102 103
    /* State of the thread */
    volatile task_state_t task_state;
104 105 106 107

    /* This fd is used internally by ITTI. */
    int epoll_fd;

108
    /* The thread fd */
109 110 111 112 113 114 115 116 117 118 119
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
120 121

    int epoll_nb_events;
122 123 124 125 126 127 128

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
129
#endif
130 131 132 133 134
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
135 136
} task_desc_t;

137
typedef struct itti_desc_s {
138
    thread_desc_t *threads;
139
    task_desc_t *tasks;
140

141
    /* Current message number. Incremented every call to send_msg_to_task */
142
    message_number_t message_number __attribute__((aligned(8)));
143 144

    thread_id_t thread_max;
145
    task_id_t task_max;
146 147
    MessagesIds messages_id_max;

148 149
    pthread_t thread_handling_signals;

150
    const task_info_t *tasks_info;
151 152
    const message_info_t *messages_info;

153
    itti_lte_time_t lte_time;
154 155 156 157 158

    int running;
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
159 160 161
} itti_desc_t;

static itti_desc_t itti_desc;
162

163
static inline message_number_t itti_increment_message_number(void) {
164 165 166 167
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
168
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
169 170
}

171
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
172 173 174 175 176
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

177
const char *itti_get_message_name(MessagesIds message_id) {
178 179 180 181 182
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

183
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
184
{
185
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
186

187
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
188 189
}

190
static task_id_t itti_get_current_task_id(void)
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

208 209 210 211 212 213
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

214
int itti_send_broadcast_message(MessageDef *message_p) {
215
    task_id_t destination_task_id;
216
    thread_id_t origin_thread_id;
217
    uint32_t thread_id;
218
    int ret = 0;
219
    int result;
220

221
    DevAssert(message_p != NULL);
222

Cedric Roux's avatar
Cedric Roux committed
223
    origin_thread_id = TASK_GET_THREAD_ID(message_p->ittiMsgHeader.originTaskId);
224

225 226
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
227 228
        MessageDef *new_message_p;

229 230 231 232
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
233
        /* Skip task that broadcast the message */
234
        if (thread_id != origin_thread_id) {
235
            /* Skip tasks which are not running */
236
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
237
                new_message_p = malloc (sizeof(MessageDef));
238
                DevAssert(message_p != NULL);
239 240

                memcpy (new_message_p, message_p, sizeof(MessageDef));
241
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
242
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
243
            }
244 245
        }
    }
246
    free (message_p);
247 248 249 250

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
251 252
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
253 254
    MessageDef *temp = NULL;

255
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
256

257 258 259 260 261 262
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

Cedric Roux's avatar
Cedric Roux committed
263
    temp = calloc (1, sizeof(MessageHeader) + size);
264
    DevAssert(temp != NULL);
265

Cedric Roux's avatar
Cedric Roux committed
266 267 268
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
269 270 271 272

    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
273 274 275 276 277
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

278
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
279
{
280
    thread_id_t destination_thread_id;
281
    thread_id_t origin_task_id;
282
    message_list_t *new;
283 284 285
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
286

287
    DevAssert(message != NULL);
288
    DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
289

290 291
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
292 293 294 295
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
296 297
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

298 299 300 301
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
302
                                            destination_task_id);
303 304
#endif

305
    priority = itti_get_message_priority (message_id);
306

307 308
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
309

310 311 312
/*
 *
 #ifdef RTAI
313 314 315 316
    if ((pthread_self() == itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) ||
        (task_id == TASK_UNKNOWN) ||
        ((TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN) &&
        (pthread_self() == itti_desc.threads[TASK_GET_PARENT_TASK_ID(origin_task_id)].task_thread)))
317
#endif
318 319 320 321 322
*/
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
323 324
    {
        /* We cannot send a message if the task is not running */
325 326
        DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, itti_desc.threads[destination_thread_id].task_state,
                 TASK_STATE_READY, destination_thread_id);
327

328
        /* Allocate new list element */
329
        new = (message_list_t *) malloc (sizeof(struct message_list_s));
330
        DevAssert(new != NULL);
331

332 333 334 335
        /* Fill in members */
        new->msg = message;
        new->message_number = message_number;
        new->message_priority = priority;
336

337 338
        /* Enqueue message in destination task queue */
        lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
339

340 341
#ifdef RTAI
        if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
342
        {
343 344
            /* This is a RT task, increase destination task messages pending counter */
            __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
345
        }
346 347 348 349 350 351 352 353 354 355 356 357 358
        else
#endif
        {
            /* Only use event fd for tasks, subtasks will pool the queue */
            if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
            {
                ssize_t write_ret;
                uint64_t sem_counter = 1;

                /* Call to write for an event fd must be of 8 bytes */
                write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
            }
359
        }
360

361 362 363 364 365 366 367
        ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                   itti_desc.messages_info[message_id].name,
                   message_number,
                   priority,
                   itti_get_task_name(origin_task_id),
                   destination_task_id,
                   itti_get_task_name(destination_task_id));
368
    }
369

370 371
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG_END,
372
                                            destination_task_id);
373 374
#endif

375 376 377
    return 0;
}

378 379
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
380
    thread_id_t thread_id;
381 382
    struct epoll_event event;

383
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
384 385
    DevCheck(fd >= 0, fd, 0, 0);

386 387
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
388 389

    /* Reallocate the events */
390 391 392
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
393

394
    event.events  = EPOLLIN | EPOLLERR;
395 396 397
    event.data.fd = fd;

    /* Add the event fd to the list of monitored events */
398
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
399 400 401 402 403 404 405
        &event) != 0)
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
406 407

    ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
408 409 410 411
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
412 413
    thread_id_t thread_id;

414
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
415 416
    DevCheck(fd >= 0, fd, 0, 0);

417
    thread_id = TASK_GET_THREAD_ID(task_id);
418
    /* Add the event fd to the list of monitored events */
419
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
420 421 422 423 424 425 426
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

427 428 429 430
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
431 432 433 434
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
435 436
    thread_id_t thread_id;

437
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
438

439 440
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
441

442
    return itti_desc.threads[thread_id].epoll_nb_events;
443 444
}

445 446
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
447
    thread_id_t thread_id;
448 449
    int epoll_ret = 0;
    int epoll_timeout = 0;
450
    int i;
451

452
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
453 454
    DevAssert(received_msg != NULL);

455
    thread_id = TASK_GET_THREAD_ID(task_id);
456 457 458 459 460 461 462 463 464 465 466 467 468
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
        /* timeout = -1 causes the epoll_wait to wait indefinetely.
         */
        epoll_timeout = -1;
    }

469
    do {
470 471 472
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
473 474
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
475 476 477

    if (epoll_ret < 0) {
        ITTI_ERROR("epoll_wait failed for task %s: %s\n",
478
                   itti_get_task_name(task_id), strerror(errno));
479 480 481 482 483 484 485
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

486
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
487

488
    for (i = 0; i < epoll_ret; i++) {
489
        /* Check if there is an event for ITTI for the event fd */
490 491
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
492
        {
493 494
            struct message_list_s *message;
            uint64_t sem_counter;
495
            ssize_t  read_ret;
496 497

            /* Read will always return 1 */
498
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
499
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
500

501
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
502
                /* No element in list -> this should not happen */
503
                DevParam(task_id, epoll_ret, 0);
504 505
            }
            *received_msg = message->msg;
506
            free (message);
507
            return;
508 509 510 511 512 513
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
514 515 516 517
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            task_id);
#endif
518
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
519

520
    #if defined(OAI_EMU) || defined(RTAI)
521 522 523
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG_END,
                                            task_id);
#endif
524 525
}

526 527
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
528
    DevAssert(received_msg != NULL);
529 530 531

    *received_msg = NULL;

532 533 534 535 536 537 538 539 540 541 542 543 544 545
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            task_id);
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
            free (message);
        }
    }
546

547
    if ((itti_debug_poll) && (*received_msg == NULL)) {
548
        ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
549
    }
550 551 552 553 554

#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG_END,
                                            task_id);
#endif
555 556
}

557
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
558
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
559
    int result;
560 561

    DevAssert(start_routine != NULL);
562
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
563 564
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
565

566
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
567

568 569
    ITTI_DEBUG("Create thread for task %s\n", itti_get_task_name(task_id));

570
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
571
    DevCheck(result >= 0, task_id, thread_id, result);
572 573

    /* Wait till the thread is completely ready */
574
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
575
        ;
576 577 578
    return 0;
}

579 580 581 582 583 584 585 586 587 588 589
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

590 591
void itti_mark_task_ready(task_id_t task_id)
{
592 593
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

594 595
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

596 597 598 599 600 601 602 603 604
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

605 606 607
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

608 609
    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
610

611
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
612 613
}

614 615 616 617
void itti_exit_task(void) {
    pthread_exit (NULL);
}

618
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
619
    // Sends Terminate signals to all tasks.
620 621 622 623 624 625 626
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
627 628
}

629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
        usleep (100);

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
                    uint64_t sem_counter = pending_messages;

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
    }
    return NULL;
}
#endif

663
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
664
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
665 666
    task_id_t task_id;
    thread_id_t thread_id;
667 668
    int ret;

669
    itti_desc.message_number = 1;
670

671
    ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
672

673
#if !defined(RTAI)
674 675 676
    /* SR: disable signals module for RTAI (need to harmonize management
     * between softmodem and oaisim).
     */
677
    CHECK_INIT_RETURN(signal_init());
678
#endif
679

680
    /* Saves threads and messages max values */
681
    itti_desc.task_max = task_max;
682 683
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
684
    itti_desc.thread_handling_signals = -1;
685
    itti_desc.tasks_info = tasks_info;
686 687 688
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
689 690 691 692
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
693 694

    /* Initializing each queue and related stuff */
695
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
696
    {
697 698 699 700 701 702 703
        ITTI_DEBUG("Initializing %stask %s%s%s\n",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

704
        ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
705 706 707

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
708
        {
709
            ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id);
710 711 712
            DevAssert(0 == 1);
        }

713 714 715 716 717 718
# ifdef RTAI
        if (task_id == TASK_L2L1)
        {
            ret = rtf_sem_init(56, 0);
        }
# endif
719 720 721 722 723 724
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
725

726 727
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
728 729 730 731 732
            ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

733 734
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
        if (itti_desc.threads[thread_id].task_event_fd == -1)
735
        {
736 737 738 739 740
            ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

741
        itti_desc.threads[thread_id].nb_events = 1;
742

743
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
744

745 746
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
747 748

        /* Add the event fd to the list of monitored events */
749 750
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
751
        {
752
            ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
753 754 755
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
756 757

        ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
758
                   itti_desc.threads[thread_id].task_event_fd, itti_get_task_name(task_id));
759

760 761 762
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
763
#endif
764
    }
765

766 767 768 769 770
    itti_desc.running = TRUE;
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
#endif
771

772
    itti_dump_init (messages_definition_xml, dump_file_name);
773

774 775 776
#ifndef RTAI
     CHECK_INIT_RETURN(timer_init ());
#endif
777 778 779 780

    return 0;
}

781 782
void itti_wait_tasks_end(void) {
    int end = 0;
783 784
    int thread_id;
    task_id_t task_id;
785 786 787
    int ready_tasks;
    int result;
    int retries = 10;
788 789 790 791 792 793 794 795

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

796 797 798
    do {
        ready_tasks = 0;

799 800
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
801
            /* Skip tasks which are not running */
802 803 804 805 806
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
807

808
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
809

810
                ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result);
811 812 813

                if (result == 0) {
                    /* Thread has terminated */
814
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
815 816 817 818 819 820 821 822 823
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
824
        }
825 826
    } while ((ready_tasks > 0) && (retries--));

827 828
    itti_desc.running = FALSE;

829 830 831
    if (ready_tasks > 0) {
        ITTI_DEBUG("Some threads are still running, force exit\n");
        exit (0);
832
    }
833 834

    itti_dump_exit();
835 836 837
}

void itti_send_terminate_message(task_id_t task_id) {
838 839
    MessageDef *terminate_message_p;

840
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
841

842
    itti_send_broadcast_message (terminate_message_p);
843
}