intertask_interface.c 27.6 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30

31
#define _GNU_SOURCE
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
39

40 41 42 43
#ifdef RTAI
# include <rtai_fifos.h>
#endif

44 45
#include "assertions.h"

46 47 48
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
49

50 51
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
52

53 54 55 56
#if defined(OAI_EMU) || defined(RTAI)
# include "vcd_signal_dumper.h"
#endif

57 58 59
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
60 61 62 63
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

64
#include "signals.h"
65 66
#include "timer.h"

67 68
const int itti_debug = 0;
const int itti_debug_poll = 0;
69

70 71 72 73 74 75 76 77
/* Don't flush if using RTAI */
#ifdef RTAI
# define ITTI_DEBUG(x, args...) do { if (itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
# define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
78
    while(0)
79
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
80
    while(0)
81
#endif
82 83 84 85 86

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
87
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
88 89 90
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
91
typedef struct message_list_s {
92
    MessageDef *msg; ///< Pointer to the message
93

94 95
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
96
} message_list_t;
97

98 99 100
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
101

102 103
    /* State of the thread */
    volatile task_state_t task_state;
104 105 106 107

    /* This fd is used internally by ITTI. */
    int epoll_fd;

108
    /* The thread fd */
109 110 111 112 113 114 115 116 117 118 119
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
120 121

    int epoll_nb_events;
122 123 124 125 126 127 128

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
129
#endif
130 131 132 133 134
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
135 136
} task_desc_t;

137
typedef struct itti_desc_s {
138
    thread_desc_t *threads;
139
    task_desc_t *tasks;
140

141
    /* Current message number. Incremented every call to send_msg_to_task */
142
    message_number_t message_number __attribute__((aligned(8)));
143 144

    thread_id_t thread_max;
145
    task_id_t task_max;
146 147
    MessagesIds messages_id_max;

148 149
    pthread_t thread_handling_signals;

150
    const task_info_t *tasks_info;
151 152
    const message_info_t *messages_info;

153
    itti_lte_time_t lte_time;
154 155 156 157 158

    int running;
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
159 160 161
} itti_desc_t;

static itti_desc_t itti_desc;
162

163
static inline message_number_t itti_increment_message_number(void) {
164 165 166 167
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
168
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
169 170
}

171
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
172 173 174 175 176
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

177
const char *itti_get_message_name(MessagesIds message_id) {
178 179 180 181 182
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

183
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
184
{
185
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
186

187
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
188 189
}

190
static task_id_t itti_get_current_task_id(void)
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

208 209 210 211 212 213
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

214
int itti_send_broadcast_message(MessageDef *message_p) {
215
    task_id_t destination_task_id;
216
    thread_id_t origin_thread_id;
217
    uint32_t thread_id;
218
    int ret = 0;
219
    int result;
220

221
    DevAssert(message_p != NULL);
222

Cedric Roux's avatar
Cedric Roux committed
223
    origin_thread_id = TASK_GET_THREAD_ID(message_p->ittiMsgHeader.originTaskId);
224

225 226
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
227 228
        MessageDef *new_message_p;

229 230 231 232
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
233
        /* Skip task that broadcast the message */
234
        if (thread_id != origin_thread_id) {
235
            /* Skip tasks which are not running */
236
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
237
                new_message_p = malloc (sizeof(MessageDef));
238
                DevAssert(message_p != NULL);
239 240

                memcpy (new_message_p, message_p, sizeof(MessageDef));
241
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
242
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
243
            }
244 245
        }
    }
246
    free (message_p);
247 248 249 250

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
251 252
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
253 254
    MessageDef *temp = NULL;

255
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
256

257 258 259 260
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

261 262 263 264 265 266
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

267
    temp = malloc (sizeof(MessageHeader) + size);
268
    DevAssert(temp != NULL);
269

Cedric Roux's avatar
Cedric Roux committed
270 271 272
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
273

274 275 276 277
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

278 279 280
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
281 282 283 284 285
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

286
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
287
{
288
    thread_id_t destination_thread_id;
289
    thread_id_t origin_task_id;
290
    message_list_t *new;
291 292 293
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
294

winckel's avatar
winckel committed
295 296 297 298
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG, 1L << destination_task_id);
#endif

299
    DevAssert(message != NULL);
300
    DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
301

302 303
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
304 305 306 307
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
308 309
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

310 311
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

312
    priority = itti_get_message_priority (message_id);
313

314 315
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
316

317 318 319
/*
 *
 #ifdef RTAI
320 321 322 323
    if ((pthread_self() == itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) ||
        (task_id == TASK_UNKNOWN) ||
        ((TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN) &&
        (pthread_self() == itti_desc.threads[TASK_GET_PARENT_TASK_ID(origin_task_id)].task_thread)))
324
#endif
325 326 327 328 329
*/
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
330
    {
winckel's avatar
winckel committed
331 332 333 334
#if defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

335
        /* We cannot send a message if the task is not running */
336 337
        DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, itti_desc.threads[destination_thread_id].task_state,
                 TASK_STATE_READY, destination_thread_id);
338

339
        /* Allocate new list element */
340
        new = (message_list_t *) malloc (sizeof(struct message_list_s));
341
        DevAssert(new != NULL);
342

343 344 345 346
        /* Fill in members */
        new->msg = message;
        new->message_number = message_number;
        new->message_priority = priority;
347

348 349
        /* Enqueue message in destination task queue */
        lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
350

winckel's avatar
winckel committed
351 352 353 354
#if defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
#endif

355 356
#ifdef RTAI
        if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
357
        {
358 359
            /* This is a RT task, increase destination task messages pending counter */
            __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
360
        }
361 362 363 364 365 366 367 368 369 370 371 372 373
        else
#endif
        {
            /* Only use event fd for tasks, subtasks will pool the queue */
            if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
            {
                ssize_t write_ret;
                uint64_t sem_counter = 1;

                /* Call to write for an event fd must be of 8 bytes */
                write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
            }
374
        }
375

376 377 378 379 380 381 382
        ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                   itti_desc.messages_info[message_id].name,
                   message_number,
                   priority,
                   itti_get_task_name(origin_task_id),
                   destination_task_id,
                   itti_get_task_name(destination_task_id));
383
    }
384

385
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
386
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG, 0);
387 388
#endif

389 390 391
    return 0;
}

392 393
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
394
    thread_id_t thread_id;
395 396
    struct epoll_event event;

397
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
398 399
    DevCheck(fd >= 0, fd, 0, 0);

400 401
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
402 403

    /* Reallocate the events */
404 405 406
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
407

408
    event.events  = EPOLLIN | EPOLLERR;
409 410 411
    event.data.fd = fd;

    /* Add the event fd to the list of monitored events */
412
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
413 414 415 416 417 418 419
        &event) != 0)
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
420 421

    ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
422 423 424 425
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
426 427
    thread_id_t thread_id;

428
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
429 430
    DevCheck(fd >= 0, fd, 0, 0);

431
    thread_id = TASK_GET_THREAD_ID(task_id);
432
    /* Add the event fd to the list of monitored events */
433
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
434 435 436 437 438 439 440
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

441 442 443 444
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
445 446 447 448
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
449 450
    thread_id_t thread_id;

451
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
452

453 454
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
455

456
    return itti_desc.threads[thread_id].epoll_nb_events;
457 458
}

459 460
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
461
    thread_id_t thread_id;
462 463
    int epoll_ret = 0;
    int epoll_timeout = 0;
464
    int i;
465

466
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
467 468
    DevAssert(received_msg != NULL);

469
    thread_id = TASK_GET_THREAD_ID(task_id);
470 471 472 473 474 475 476 477 478 479 480 481 482
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
        /* timeout = -1 causes the epoll_wait to wait indefinetely.
         */
        epoll_timeout = -1;
    }

483
    do {
484 485 486
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
487 488
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
489 490 491

    if (epoll_ret < 0) {
        ITTI_ERROR("epoll_wait failed for task %s: %s\n",
492
                   itti_get_task_name(task_id), strerror(errno));
493 494 495 496 497 498 499
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

500
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
501

502
    for (i = 0; i < epoll_ret; i++) {
503
        /* Check if there is an event for ITTI for the event fd */
504 505
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
506
        {
507 508
            struct message_list_s *message;
            uint64_t sem_counter;
509
            ssize_t  read_ret;
510 511

            /* Read will always return 1 */
512
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
513
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
514

515
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
516
                /* No element in list -> this should not happen */
517
                DevParam(task_id, epoll_ret, 0);
518 519
            }
            *received_msg = message->msg;
520
            free (message);
521
            return;
522 523 524 525 526 527
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
528
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
529
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG, 0);
530
#endif
531
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
532

winckel's avatar
winckel committed
533 534
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG, 1L << task_id);
535
#endif
536 537
}

538 539
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
540
    DevAssert(received_msg != NULL);
541 542 543

    *received_msg = NULL;

544
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
545
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG, 1L << task_id);
546 547 548 549 550 551 552 553 554 555 556
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
            free (message);
        }
    }
557

558
    if ((itti_debug_poll) && (*received_msg == NULL)) {
559
        ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
560
    }
561 562

#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
563
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG, 0);
564
#endif
565 566
}

567
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
568
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
569
    int result;
570 571

    DevAssert(start_routine != NULL);
572
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
573 574
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
575

576
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
577

578 579
    ITTI_DEBUG("Create thread for task %s\n", itti_get_task_name(task_id));

580
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
581
    DevCheck(result >= 0, task_id, thread_id, result);
582 583

    /* Wait till the thread is completely ready */
584
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
585
        ;
586 587 588
    return 0;
}

589 590 591 592 593 594 595 596 597 598 599
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

600 601
void itti_mark_task_ready(task_id_t task_id)
{
602 603
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

604 605
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

606 607 608 609 610 611 612 613 614
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

615 616 617
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

618 619
    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
620

621
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
622 623
}

624 625 626 627
void itti_exit_task(void) {
    pthread_exit (NULL);
}

628
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
629
    // Sends Terminate signals to all tasks.
630 631 632 633 634 635 636
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
637 638
}

639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
        usleep (100);

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
                    uint64_t sem_counter = pending_messages;

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
    }
    return NULL;
}
#endif

673
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
674
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
675 676
    task_id_t task_id;
    thread_id_t thread_id;
677 678
    int ret;

679
    itti_desc.message_number = 1;
680

681
    ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
682

683
#if !defined(RTAI)
684 685 686
    /* SR: disable signals module for RTAI (need to harmonize management
     * between softmodem and oaisim).
     */
687
    CHECK_INIT_RETURN(signal_init());
688
#endif
689

690
    /* Saves threads and messages max values */
691
    itti_desc.task_max = task_max;
692 693
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
694
    itti_desc.thread_handling_signals = -1;
695
    itti_desc.tasks_info = tasks_info;
696 697 698
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
699 700 701 702
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
703 704

    /* Initializing each queue and related stuff */
705
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
706
    {
707 708 709 710 711 712 713
        ITTI_DEBUG("Initializing %stask %s%s%s\n",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

714
        ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
715 716 717

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
718
        {
719
            ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id);
720 721 722
            DevAssert(0 == 1);
        }

723 724 725 726 727 728
# ifdef RTAI
        if (task_id == TASK_L2L1)
        {
            ret = rtf_sem_init(56, 0);
        }
# endif
729 730 731 732 733 734
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
735

736 737
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
738 739 740 741 742
            ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

743 744
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
        if (itti_desc.threads[thread_id].task_event_fd == -1)
745
        {
746 747 748 749 750
            ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

751
        itti_desc.threads[thread_id].nb_events = 1;
752

753
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
754

755 756
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
757 758

        /* Add the event fd to the list of monitored events */
759 760
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
761
        {
762
            ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
763 764 765
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
766 767

        ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
768
                   itti_desc.threads[thread_id].task_event_fd, itti_get_task_name(task_id));
769

770 771 772
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
773
#endif
774
    }
775

776
    itti_desc.running = 1;
777 778 779 780
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
#endif
781

782
    itti_dump_init (messages_definition_xml, dump_file_name);
783

784 785 786
#ifndef RTAI
     CHECK_INIT_RETURN(timer_init ());
#endif
787 788 789 790

    return 0;
}

791 792
void itti_wait_tasks_end(void) {
    int end = 0;
793 794
    int thread_id;
    task_id_t task_id;
795 796 797
    int ready_tasks;
    int result;
    int retries = 10;
798 799 800 801 802 803 804 805

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

806 807 808
    do {
        ready_tasks = 0;

809 810
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
811
            /* Skip tasks which are not running */
812 813 814 815 816
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
817

818
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
819

820
                ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result);
821 822 823

                if (result == 0) {
                    /* Thread has terminated */
824
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
825 826 827 828 829 830 831 832 833
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
834
        }
835 836
    } while ((ready_tasks > 0) && (retries--));

837
    itti_desc.running = 0;
838

839 840 841
    if (ready_tasks > 0) {
        ITTI_DEBUG("Some threads are still running, force exit\n");
        exit (0);
842
    }
843 844

    itti_dump_exit();
845 846 847
}

void itti_send_terminate_message(task_id_t task_id) {
848 849
    MessageDef *terminate_message_p;

850
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
851

852
    itti_send_broadcast_message (terminate_message_p);
853
}