intertask_interface.c 31.9 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30

31
#define _GNU_SOURCE
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
39 40 41

#include "assertions.h"

42 43 44
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
45

46 47
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
48

49 50 51 52
#ifdef RTAI
# include <rtai_shm.h>
#endif

53 54 55 56
#if defined(OAI_EMU) || defined(RTAI)
# include "vcd_signal_dumper.h"
#endif

57 58 59
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
60 61 62 63
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

64
#include "signals.h"
65 66
#include "timer.h"

67 68
const int itti_debug = 0;
const int itti_debug_poll = 0;
69

70 71 72 73 74 75 76 77
/* Don't flush if using RTAI */
#ifdef RTAI
# define ITTI_DEBUG(x, args...) do { if (itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
# define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
78
    while(0)
79
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
80
    while(0)
81
#endif
82 83 84 85

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

86
#ifndef EFD_SEMAPHORE
87 88 89
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

90 91 92 93 94
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

95
typedef enum task_state_s {
96
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
97 98 99
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
100
typedef struct message_list_s {
101
    MessageDef *msg; ///< Pointer to the message
102

103 104
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
105
} message_list_t;
106

107 108 109
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
110

111 112
    /* State of the thread */
    volatile task_state_t task_state;
113 114 115 116

    /* This fd is used internally by ITTI. */
    int epoll_fd;

117
    /* The thread fd */
118 119 120 121 122
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

123
#if defined(KERNEL_VERSION_PRE_2_6_30)
124
    eventfd_t sem_counter;
125 126
#endif

127 128 129 130 131 132
    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
133 134

    int epoll_nb_events;
135 136 137 138 139 140 141

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
142
#endif
143 144 145 146 147
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
148 149
} task_desc_t;

150
typedef struct itti_desc_s {
151
    thread_desc_t *threads;
152
    task_desc_t   *tasks;
153

154
    /* Current message number. Incremented every call to send_msg_to_task */
155
    message_number_t message_number __attribute__((aligned(8)));
156 157

    thread_id_t thread_max;
158
    task_id_t task_max;
159 160
    MessagesIds messages_id_max;

161 162
    pthread_t thread_handling_signals;

163
    const task_info_t *tasks_info;
164 165
    const message_info_t *messages_info;

166
    itti_lte_time_t lte_time;
167 168

    int running;
169 170 171 172

    volatile uint32_t created_tasks;
    volatile uint32_t ready_tasks;
    volatile int      wait_tasks;
173 174 175
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
176 177 178 179 180 181

#if defined(OAI_EMU) || defined(RTAI)
    uint64_t vcd_poll_msg;
    uint64_t vcd_receive_msg;
    uint64_t vcd_send_msg;
#endif
182 183 184
} itti_desc_t;

static itti_desc_t itti_desc;
185

186 187 188 189 190 191 192 193 194 195 196
void *itti_malloc(task_id_t task_id, ssize_t size)
{
    void *ptr = NULL;

#ifdef RTAI
//     ptr = rt_malloc(size);
    ptr = malloc(size);
#else
    ptr = malloc(size);
#endif

Cedric Roux's avatar
Cedric Roux committed
197
    DevCheck(ptr != NULL, size, task_id, 0);
198 199 200 201 202 203 204 205 206 207 208 209 210 211

    return ptr;
}

void itti_free(task_id_t task_id, void *ptr)
{
    DevAssert(ptr != NULL);
#ifdef RTAI
    free(ptr);
#else
    free(ptr);
#endif
}

212
static inline message_number_t itti_increment_message_number(void) {
213 214 215 216
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
217
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
218 219
}

220
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
221 222 223 224 225
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

226
const char *itti_get_message_name(MessagesIds message_id) {
227 228 229 230 231
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

232
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
233
{
234
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
235

236
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
237 238
}

239
static task_id_t itti_get_current_task_id(void)
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

257 258 259 260 261 262
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

263
int itti_send_broadcast_message(MessageDef *message_p) {
264
    task_id_t destination_task_id;
265
    task_id_t origin_task_id;
266
    thread_id_t origin_thread_id;
267
    uint32_t thread_id;
268
    int ret = 0;
269
    int result;
270

271
    DevAssert(message_p != NULL);
272

273 274
    origin_task_id = message_p->ittiMsgHeader.originTaskId;
    origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
275

276 277
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
278 279
        MessageDef *new_message_p;

280 281 282 283
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
284
        /* Skip task that broadcast the message */
285
        if (thread_id != origin_thread_id) {
286
            /* Skip tasks which are not running */
287
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
288
                new_message_p = itti_malloc (origin_task_id, sizeof(MessageDef));
289
                DevAssert(message_p != NULL);
290 291

                memcpy (new_message_p, message_p, sizeof(MessageDef));
292
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
293
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
294
            }
295 296
        }
    }
297
    free (message_p);
298 299 300 301

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
302 303
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
304 305
    MessageDef *temp = NULL;

306
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
307

308 309 310 311
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

312 313 314 315 316 317
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

318
    temp = itti_malloc (origin_task_id, sizeof(MessageHeader) + size);
319
    DevAssert(temp != NULL);
320

Cedric Roux's avatar
Cedric Roux committed
321 322 323
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
324

325 326 327 328
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

329 330 331
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
332 333 334 335 336
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

337
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
338
{
339
    thread_id_t destination_thread_id;
340
    thread_id_t origin_task_id;
341
    message_list_t *new;
342 343 344
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
345

winckel's avatar
winckel committed
346
#if defined(OAI_EMU) || defined(RTAI)
347 348
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
349 350
#endif

351
    DevAssert(message != NULL);
352
    DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
353

354 355
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
356 357 358 359
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
360 361
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

362 363
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

364
    priority = itti_get_message_priority (message_id);
365

366 367
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
368

369
    itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
370 371 372
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
373
    {
374
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
375 376 377
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

378 379
        if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
        {
380
            ITTI_DEBUG(" Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
381 382 383 384 385 386 387 388 389 390 391 392
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
        else
        {
            /* We cannot send a message if the task is not running */
            DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, destination_thread_id,
                     itti_desc.threads[destination_thread_id].task_state, message_id);
393

394
            /* Allocate new list element */
395
            new = (message_list_t *) itti_malloc (origin_task_id, sizeof(struct message_list_s));
396
            DevAssert(new != NULL);
397

398 399 400 401
            /* Fill in members */
            new->msg = message;
            new->message_number = message_number;
            new->message_priority = priority;
402

403 404
            /* Enqueue message in destination task queue */
            lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
405

406
#if defined(OAI_EMU) || defined(RTAI)
407
            vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
408 409
#endif

410
#ifdef RTAI
411 412 413 414 415 416
            if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
            {
                /* This is a RT task, increase destination task messages pending counter */
                __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
            }
            else
417 418
#endif
            {
419 420 421 422
                /* Only use event fd for tasks, subtasks will pool the queue */
                if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
                {
                    ssize_t write_ret;
423
                    eventfd_t sem_counter = 1;
424

425 426 427 428
                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
                }
429
            }
430

431
            ITTI_DEBUG(" Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
432 433 434 435 436 437 438
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
Cedric Roux's avatar
Cedric Roux committed
439 440 441
    } else {
        /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
        itti_free(origin_task_id, message);
442
    }
443

444
#if defined(OAI_EMU) || defined(RTAI)
445 446
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
447 448
#endif

449 450 451
    return 0;
}

452 453
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
454
    thread_id_t thread_id;
455 456
    struct epoll_event event;

457
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
458 459
    DevCheck(fd >= 0, fd, 0, 0);

460 461
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
462 463

    /* Reallocate the events */
464 465 466
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
467

468
    event.events  = EPOLLIN | EPOLLERR;
Cedric Roux's avatar
Cedric Roux committed
469 470
    event.data.u64 = 0;
    event.data.fd  = fd;
471 472

    /* Add the event fd to the list of monitored events */
473
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
474 475
        &event) != 0)
    {
476
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
477 478 479 480
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
481

482
    ITTI_DEBUG(" Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
483 484 485 486
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
487 488
    thread_id_t thread_id;

489
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
490 491
    DevCheck(fd >= 0, fd, 0, 0);

492
    thread_id = TASK_GET_THREAD_ID(task_id);
493
    /* Add the event fd to the list of monitored events */
494
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
495
    {
496
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
497 498 499 500 501
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

502 503 504 505
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
506 507 508 509
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
510 511
    thread_id_t thread_id;

512
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
513

514 515
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
516

517
    return itti_desc.threads[thread_id].epoll_nb_events;
518 519
}

520 521
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
522
    thread_id_t thread_id;
523 524
    int epoll_ret = 0;
    int epoll_timeout = 0;
525
    int i;
526

527
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
528 529
    DevAssert(received_msg != NULL);

530
    thread_id = TASK_GET_THREAD_ID(task_id);
531 532 533 534 535 536 537 538
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
539
        /* timeout = -1 causes the epoll_wait to wait indefinitely.
540 541 542 543
         */
        epoll_timeout = -1;
    }

544
    do {
545 546 547
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
548 549
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
550 551

    if (epoll_ret < 0) {
552
        ITTI_ERROR(" epoll_wait failed for task %s: %s\n",
553
                   itti_get_task_name(task_id), strerror(errno));
554 555 556 557 558 559 560
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

561
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
562

563
    for (i = 0; i < epoll_ret; i++) {
564
        /* Check if there is an event for ITTI for the event fd */
565 566
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
567
        {
568
            struct message_list_s *message = NULL;
569 570
            eventfd_t sem_counter;
            ssize_t   read_ret;
571 572

            /* Read will always return 1 */
573
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
574
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
575

576 577
#if defined(KERNEL_VERSION_PRE_2_6_30)
            /* Store the value of the semaphore counter */
578
            itti_desc.threads[task_id].sem_counter = sem_counter - 1;
579 580
#endif

581
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
582
                /* No element in list -> this should not happen */
583
                DevParam(task_id, epoll_ret, 0);
584
            }
585
            DevAssert(message != NULL);
586
            *received_msg = message->msg;
587
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
588 589
            /* Mark that the event has been processed */
            itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
590
            return;
591 592 593 594 595 596
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
597
#if defined(OAI_EMU) || defined(RTAI)
598 599
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
600
#endif
601 602 603

#if defined(KERNEL_VERSION_PRE_2_6_30)
    /* Store the value of the semaphore counter */
604 605 606
    if (itti_desc.threads[task_id].sem_counter > 0) {
        struct message_list_s *message = NULL;

607 608
        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
            /* No element in list -> this should not happen */
609
            DevParam(task_id, itti_desc.threads[task_id].sem_counter, 0);
610 611 612 613 614
        }
        DevAssert(message != NULL);
        *received_msg = message->msg;
        free (message);

615
        itti_desc.threads[task_id].sem_counter--;
616 617
    } else
#endif
618
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
619

winckel's avatar
winckel committed
620
#if defined(OAI_EMU) || defined(RTAI)
621 622
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
623
#endif
624 625
}

626 627
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
628
    DevAssert(received_msg != NULL);
629 630 631

    *received_msg = NULL;

632
#if defined(OAI_EMU) || defined(RTAI)
633 634
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
635 636 637 638 639 640 641 642 643 644 645
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
            free (message);
        }
    }
646

647
    if ((itti_debug_poll) && (*received_msg == NULL)) {
648
        ITTI_DEBUG(" No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
649
    }
650 651

#if defined(OAI_EMU) || defined(RTAI)
652 653
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
654
#endif
655 656
}

657
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
658
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
659
    int result;
660 661

    DevAssert(start_routine != NULL);
662
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
663 664
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
665

666
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
667

668
    ITTI_DEBUG(" Creating thread for task %s ...\n", itti_get_task_name(task_id));
669

670
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
671
    DevCheck(result >= 0, task_id, thread_id, result);
672

673 674
    itti_desc.created_tasks ++;

675
    /* Wait till the thread is completely ready */
676
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
677 678
        usleep (1000);

679 680 681
    return 0;
}

682 683 684 685 686 687 688 689 690 691 692
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

693 694 695 696 697 698 699 700 701 702
void itti_wait_ready(int wait_tasks)
{
    itti_desc.wait_tasks = wait_tasks;

    ITTI_DEBUG(" wait for tasks: %s, created tasks %d, ready tasks %d\n", itti_desc.wait_tasks ? "yes" : "no",
        itti_desc.created_tasks, itti_desc.ready_tasks);

    DevCheck(itti_desc.created_tasks == itti_desc.ready_tasks, itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
}

703 704
void itti_mark_task_ready(task_id_t task_id)
{
705 706
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

707 708
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

709 710 711 712 713 714
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

715 716 717 718 719 720 721 722 723
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

724
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
725 726 727 728 729 730 731 732
    itti_desc.ready_tasks ++;

    while (itti_desc.wait_tasks != 0)
    {
        usleep (10000);
    }

    ITTI_DEBUG(" task %s started\n", itti_get_task_name(task_id));
733 734
}

735
void itti_exit_task(void) {
736 737 738 739 740 741 742 743 744
#if defined(OAI_EMU) || defined(RTAI)
    task_id_t task_id = itti_get_current_task_id();

    if (task_id > TASK_UNKNOWN)
    {
        vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                                __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
    }
#endif
745 746 747
    pthread_exit (NULL);
}

748
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
749
    // Sends Terminate signals to all tasks.
750 751 752 753 754 755 756
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
757 758
}

759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
        usleep (100);

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
780
                    eventfd_t sem_counter = pending_messages;
781 782 783 784 785 786 787 788 789 790 791 792

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
    }
    return NULL;
}
#endif

793
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
794
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
795 796
    task_id_t task_id;
    thread_id_t thread_id;
797 798
    int ret;

799
    itti_desc.message_number = 1;
800

801
    ITTI_DEBUG(" Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
802

803
    CHECK_INIT_RETURN(signal_mask());
804

805
    /* Saves threads and messages max values */
806
    itti_desc.task_max = task_max;
807 808
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
809
    itti_desc.thread_handling_signals = -1;
810
    itti_desc.tasks_info = tasks_info;
811 812 813
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
814 815 816 817
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
818 819

    /* Initializing each queue and related stuff */
820
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
821
    {
822
        ITTI_DEBUG(" Initializing %stask %s%s%s\n",
823 824 825 826 827 828
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

829
        ITTI_DEBUG(" Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
830 831 832

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
833
        {
834
            ITTI_ERROR(" lfds611_queue_new failed for task %u\n", task_id);
835 836
            DevAssert(0 == 1);
        }
837 838 839 840 841 842
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
843

844 845
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
846
            ITTI_ERROR(" Failed to create new epoll fd: %s\n", strerror(errno));
847 848 849 850
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

851 852 853 854 855 856
# if defined(KERNEL_VERSION_PRE_2_6_30)
        /* SR: for kernel versions < 2.6.30 EFD_SEMAPHORE is not defined.
         * A read operation on the event fd will return the 8 byte value.
         */
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, 0);
# else
857
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
858
# endif
859
        if (itti_desc.threads[thread_id].task_event_fd == -1)
860
        {
861
            ITTI_ERROR(" eventfd failed: %s\n", strerror(errno));
862 863 864 865
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

866
        itti_desc.threads[thread_id].nb_events = 1;
867

868
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
869

870 871
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
872 873

        /* Add the event fd to the list of monitored events */
874 875
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
876
        {
877
            ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
878 879 880
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
881

882 883
        ITTI_DEBUG(" Successfully subscribed fd %d for thread %d\n",
                   itti_desc.threads[thread_id].task_event_fd, thread_id);
884

885 886 887
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
888
#endif
889
    }
890

891
    itti_desc.running = 1;
892 893 894
    itti_desc.wait_tasks = 0;
    itti_desc.created_tasks = 0;
    itti_desc.ready_tasks = 0;
895 896 897
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
898 899

    rt_global_heap_open();
900
#endif
901

902 903 904 905 906 907
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.vcd_poll_msg = 0;
    itti_desc.vcd_receive_msg = 0;
    itti_desc.vcd_send_msg = 0;
#endif

908
    itti_dump_init (messages_definition_xml, dump_file_name);
909

910
    CHECK_INIT_RETURN(timer_init ());
911 912 913 914

    return 0;
}

915 916
void itti_wait_tasks_end(void) {
    int end = 0;
917 918
    int thread_id;
    task_id_t task_id;
919 920 921
    int ready_tasks;
    int result;
    int retries = 10;
922 923 924 925 926 927 928 929

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

930 931 932
    do {
        ready_tasks = 0;

933 934
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
935
            /* Skip tasks which are not running */
936 937 938 939 940
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
941

942
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
943

944
                ITTI_DEBUG(" Thread %s join status %d\n", itti_get_task_name(task_id), result);
945 946 947

                if (result == 0) {
                    /* Thread has terminated */
948
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
949 950 951 952 953 954 955 956 957
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
958
        }
959 960
    } while ((ready_tasks > 0) && (retries--));

961
    itti_desc.running = 0;
962

963
    if (ready_tasks > 0) {
964
        ITTI_DEBUG(" Some threads are still running, force exit\n");
965
        exit (0);
966
    }
967 968

    itti_dump_exit();
969 970 971
}

void itti_send_terminate_message(task_id_t task_id) {
972 973
    MessageDef *terminate_message_p;

974
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
975

976
    itti_send_broadcast_message (terminate_message_p);
977
}