intertask_interface.c 32.3 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30

31
#define _GNU_SOURCE
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
39 40 41

#include "assertions.h"

42 43 44
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
45

46 47
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
48

49 50 51 52
#ifdef RTAI
# include <rtai_shm.h>
#endif

53 54 55 56
#if defined(OAI_EMU) || defined(RTAI)
# include "vcd_signal_dumper.h"
#endif

57 58 59
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
60 61 62 63
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

64
#include "signals.h"
65 66
#include "timer.h"

67 68
const int itti_debug = 0;
const int itti_debug_poll = 0;
69

70 71 72 73 74 75 76 77
/* Don't flush if using RTAI */
#ifdef RTAI
# define ITTI_DEBUG(x, args...) do { if (itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
# define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
78
    while(0)
79
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
80
    while(0)
81
#endif
82 83 84 85

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

86
#ifndef EFD_SEMAPHORE
87 88 89
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

90 91 92 93 94
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

95
typedef enum task_state_s {
96
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
97 98 99
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
100
typedef struct message_list_s {
101
    MessageDef *msg; ///< Pointer to the message
102

103 104
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
105
} message_list_t;
106

107 108 109
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
110

111 112
    /* State of the thread */
    volatile task_state_t task_state;
113 114 115 116

    /* This fd is used internally by ITTI. */
    int epoll_fd;

117
    /* The thread fd */
118 119 120 121 122
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

123
#if defined(KERNEL_VERSION_PRE_2_6_30)
124
    eventfd_t sem_counter;
125 126
#endif

127 128 129 130 131 132
    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
133 134

    int epoll_nb_events;
135 136 137 138 139 140 141

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
142
#endif
143 144 145 146 147
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
148 149
} task_desc_t;

150
typedef struct itti_desc_s {
151
    thread_desc_t *threads;
152
    task_desc_t   *tasks;
153

154
    /* Current message number. Incremented every call to send_msg_to_task */
155
    message_number_t message_number __attribute__((aligned(8)));
156 157

    thread_id_t thread_max;
158
    task_id_t task_max;
159 160
    MessagesIds messages_id_max;

161 162
    pthread_t thread_handling_signals;

163
    const task_info_t *tasks_info;
164 165
    const message_info_t *messages_info;

166
    itti_lte_time_t lte_time;
167 168

    int running;
169 170 171 172

    volatile uint32_t created_tasks;
    volatile uint32_t ready_tasks;
    volatile int      wait_tasks;
173 174 175
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
176 177 178 179 180 181

#if defined(OAI_EMU) || defined(RTAI)
    uint64_t vcd_poll_msg;
    uint64_t vcd_receive_msg;
    uint64_t vcd_send_msg;
#endif
182 183 184
} itti_desc_t;

static itti_desc_t itti_desc;
185

186 187 188 189 190 191 192 193 194 195 196
void *itti_malloc(task_id_t task_id, ssize_t size)
{
    void *ptr = NULL;

#ifdef RTAI
//     ptr = rt_malloc(size);
    ptr = malloc(size);
#else
    ptr = malloc(size);
#endif

Cedric Roux's avatar
Cedric Roux committed
197
    DevCheck(ptr != NULL, size, task_id, 0);
198 199 200 201 202 203 204 205 206 207 208 209 210 211

    return ptr;
}

void itti_free(task_id_t task_id, void *ptr)
{
    DevAssert(ptr != NULL);
#ifdef RTAI
    free(ptr);
#else
    free(ptr);
#endif
}

212
static inline message_number_t itti_increment_message_number(void) {
213 214 215 216
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
217
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
218 219
}

220
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
221 222 223 224 225
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

226
const char *itti_get_message_name(MessagesIds message_id) {
227 228 229 230 231
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

232
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
233
{
234
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
235

236
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
237 238
}

239
static task_id_t itti_get_current_task_id(void)
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

257 258 259 260 261 262
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

263
int itti_send_broadcast_message(MessageDef *message_p) {
264
    task_id_t destination_task_id;
265
    task_id_t origin_task_id;
266
    thread_id_t origin_thread_id;
267
    uint32_t thread_id;
268
    int ret = 0;
269
    int result;
270

271
    DevAssert(message_p != NULL);
272

273 274
    origin_task_id = message_p->ittiMsgHeader.originTaskId;
    origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
275

276 277
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
278 279
        MessageDef *new_message_p;

280 281 282 283
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
284
        /* Skip task that broadcast the message */
285
        if (thread_id != origin_thread_id) {
286
            /* Skip tasks which are not running */
287
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
288
                new_message_p = itti_malloc (origin_task_id, sizeof(MessageDef));
289
                DevAssert(message_p != NULL);
290 291

                memcpy (new_message_p, message_p, sizeof(MessageDef));
292
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
293
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
294
            }
295 296
        }
    }
297
    free (message_p);
298 299 300 301

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
302 303
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
304 305
    MessageDef *temp = NULL;

306
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
307

308 309 310 311
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

312 313 314 315 316 317
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

318
    temp = itti_malloc (origin_task_id, sizeof(MessageHeader) + size);
319
    DevAssert(temp != NULL);
320

Cedric Roux's avatar
Cedric Roux committed
321 322 323
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
324

325 326 327 328
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

329 330 331
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
332 333 334 335 336
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

337
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
338
{
339
    thread_id_t destination_thread_id;
340
    thread_id_t origin_task_id;
341
    message_list_t *new;
342 343 344
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
345

winckel's avatar
winckel committed
346
#if defined(OAI_EMU) || defined(RTAI)
347 348
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
349 350
#endif

351
    DevAssert(message != NULL);
352
    DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
353

354 355
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
356 357 358 359
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
360 361
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

362 363
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

364
    priority = itti_get_message_priority (message_id);
365

366 367
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
368

369
    itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
370 371 372
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
373
    {
374
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
375 376 377
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

378 379
        if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
        {
380
            ITTI_DEBUG(" Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
381 382 383 384 385 386 387 388 389 390 391 392
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
        else
        {
            /* We cannot send a message if the task is not running */
            DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, destination_thread_id,
                     itti_desc.threads[destination_thread_id].task_state, message_id);
393

394
            /* Allocate new list element */
395
            new = (message_list_t *) itti_malloc (origin_task_id, sizeof(struct message_list_s));
396
            DevAssert(new != NULL);
397

398 399 400 401
            /* Fill in members */
            new->msg = message;
            new->message_number = message_number;
            new->message_priority = priority;
402

403 404
            /* Enqueue message in destination task queue */
            lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
405

406
#if defined(OAI_EMU) || defined(RTAI)
407
            vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
408 409
#endif

410
#ifdef RTAI
411 412 413 414 415 416
            if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
            {
                /* This is a RT task, increase destination task messages pending counter */
                __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
            }
            else
417 418
#endif
            {
419 420 421 422
                /* Only use event fd for tasks, subtasks will pool the queue */
                if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
                {
                    ssize_t write_ret;
423
                    eventfd_t sem_counter = 1;
424

425 426 427 428
                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
                }
429
            }
430

431
            ITTI_DEBUG(" Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
432 433 434 435 436 437 438
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
Cedric Roux's avatar
Cedric Roux committed
439 440 441
    } else {
        /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
        itti_free(origin_task_id, message);
442
    }
443

444
#if defined(OAI_EMU) || defined(RTAI)
445 446
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
447 448
#endif

449 450 451
    return 0;
}

452 453
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
454
    thread_id_t thread_id;
455 456
    struct epoll_event event;

457
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
458 459
    DevCheck(fd >= 0, fd, 0, 0);

460 461
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
462 463

    /* Reallocate the events */
464 465 466
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
467

468
    event.events  = EPOLLIN | EPOLLERR;
Cedric Roux's avatar
Cedric Roux committed
469 470
    event.data.u64 = 0;
    event.data.fd  = fd;
471 472

    /* Add the event fd to the list of monitored events */
473
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
474 475
        &event) != 0)
    {
476
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
477 478 479 480
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
481

482
    ITTI_DEBUG(" Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
483 484 485 486
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
487 488
    thread_id_t thread_id;

489
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
490 491
    DevCheck(fd >= 0, fd, 0, 0);

492
    thread_id = TASK_GET_THREAD_ID(task_id);
493
    /* Add the event fd to the list of monitored events */
494
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
495
    {
496
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
497 498 499 500 501
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

502 503 504 505
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
506 507 508 509
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
510 511
    thread_id_t thread_id;

512
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
513

514 515
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
516

517
    return itti_desc.threads[thread_id].epoll_nb_events;
518 519
}

520 521
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
522
    thread_id_t thread_id;
523 524
    int epoll_ret = 0;
    int epoll_timeout = 0;
525
    int i;
526

527
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
528 529
    DevAssert(received_msg != NULL);

530
    thread_id = TASK_GET_THREAD_ID(task_id);
531 532 533 534 535 536 537 538
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
539
        /* timeout = -1 causes the epoll_wait to wait indefinitely.
540 541 542 543
         */
        epoll_timeout = -1;
    }

544
    do {
545 546 547
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
548 549
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
550 551

    if (epoll_ret < 0) {
552
        ITTI_ERROR(" epoll_wait failed for task %s: %s\n",
553
                   itti_get_task_name(task_id), strerror(errno));
554 555 556 557 558 559 560
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

561
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
562

563
    for (i = 0; i < epoll_ret; i++) {
564
        /* Check if there is an event for ITTI for the event fd */
565 566
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
567
        {
568
            struct message_list_s *message = NULL;
569 570
            eventfd_t sem_counter;
            ssize_t   read_ret;
571 572

            /* Read will always return 1 */
573
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
574
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
575

576 577
#if defined(KERNEL_VERSION_PRE_2_6_30)
            /* Store the value of the semaphore counter */
578
            itti_desc.threads[task_id].sem_counter = sem_counter - 1;
579 580
#endif

581
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
582
                /* No element in list -> this should not happen */
583
                DevParam(task_id, epoll_ret, 0);
584
            }
585
            DevAssert(message != NULL);
586
            *received_msg = message->msg;
587
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
588 589
            /* Mark that the event has been processed */
            itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
590
            return;
591 592 593 594 595 596
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
597
#if defined(OAI_EMU) || defined(RTAI)
598 599
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
600
#endif
601 602 603

#if defined(KERNEL_VERSION_PRE_2_6_30)
    /* Store the value of the semaphore counter */
604 605 606
    if (itti_desc.threads[task_id].sem_counter > 0) {
        struct message_list_s *message = NULL;

607 608
        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
            /* No element in list -> this should not happen */
609
            DevParam(task_id, itti_desc.threads[task_id].sem_counter, 0);
610 611 612 613 614
        }
        DevAssert(message != NULL);
        *received_msg = message->msg;
        free (message);

615
        itti_desc.threads[task_id].sem_counter--;
616 617
    } else
#endif
618
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
619

winckel's avatar
winckel committed
620
#if defined(OAI_EMU) || defined(RTAI)
621 622
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
623
#endif
624 625
}

626 627
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
628
    DevAssert(received_msg != NULL);
629 630 631

    *received_msg = NULL;

632
#if defined(OAI_EMU) || defined(RTAI)
633 634
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
635 636 637 638 639 640 641 642 643 644 645
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
            free (message);
        }
    }
646

647
    if ((itti_debug_poll) && (*received_msg == NULL)) {
648
        ITTI_DEBUG(" No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
649
    }
650 651

#if defined(OAI_EMU) || defined(RTAI)
652 653
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
654
#endif
655 656
}

657
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
658
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
659
    int result;
660 661

    DevAssert(start_routine != NULL);
662
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
663 664
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
665

666
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
667

668
    ITTI_DEBUG(" Creating thread for task %s ...\n", itti_get_task_name(task_id));
669

670
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
671
    DevCheck(result >= 0, task_id, thread_id, result);
672

673 674
    itti_desc.created_tasks ++;

675
    /* Wait till the thread is completely ready */
676
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
677 678
        usleep (1000);

679 680 681
    return 0;
}

682 683 684 685 686 687 688 689 690 691 692
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

693 694 695 696 697 698 699 700 701 702
void itti_wait_ready(int wait_tasks)
{
    itti_desc.wait_tasks = wait_tasks;

    ITTI_DEBUG(" wait for tasks: %s, created tasks %d, ready tasks %d\n", itti_desc.wait_tasks ? "yes" : "no",
        itti_desc.created_tasks, itti_desc.ready_tasks);

    DevCheck(itti_desc.created_tasks == itti_desc.ready_tasks, itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
}

703 704
void itti_mark_task_ready(task_id_t task_id)
{
705 706
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

707 708
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

709 710 711 712 713 714
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

715 716 717 718 719 720 721 722 723
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

724
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
725 726 727 728 729 730 731 732
    itti_desc.ready_tasks ++;

    while (itti_desc.wait_tasks != 0)
    {
        usleep (10000);
    }

    ITTI_DEBUG(" task %s started\n", itti_get_task_name(task_id));
733 734
}

735
void itti_exit_task(void) {
736 737 738 739 740 741 742 743 744
#if defined(OAI_EMU) || defined(RTAI)
    task_id_t task_id = itti_get_current_task_id();

    if (task_id > TASK_UNKNOWN)
    {
        vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                                __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
    }
#endif
745 746 747
    pthread_exit (NULL);
}

748
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
749
    // Sends Terminate signals to all tasks.
750 751 752 753 754 755 756
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
757 758
}

759 760 761 762 763 764 765 766
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
767 768 769 770 771
        usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
#endif
772 773 774 775 776 777 778 779 780 781 782 783

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
784
                    eventfd_t sem_counter = pending_messages;
785 786 787 788 789 790 791

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
792 793 794 795

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
#endif
796 797 798 799 800
    }
    return NULL;
}
#endif

801
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
802
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
803 804
    task_id_t task_id;
    thread_id_t thread_id;
805 806
    int ret;

807
    itti_desc.message_number = 1;
808

809
    ITTI_DEBUG(" Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
810

811
    CHECK_INIT_RETURN(signal_mask());
812

813
    /* Saves threads and messages max values */
814
    itti_desc.task_max = task_max;
815 816
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
817
    itti_desc.thread_handling_signals = -1;
818
    itti_desc.tasks_info = tasks_info;
819 820 821
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
822 823 824 825
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
826 827

    /* Initializing each queue and related stuff */
828
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
829
    {
830
        ITTI_DEBUG(" Initializing %stask %s%s%s\n",
831 832 833 834 835 836
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

837
        ITTI_DEBUG(" Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
838 839 840

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
841
        {
842
            ITTI_ERROR(" lfds611_queue_new failed for task %u\n", task_id);
843 844
            DevAssert(0 == 1);
        }
845 846 847 848 849 850
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
851

852 853
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
854
            ITTI_ERROR(" Failed to create new epoll fd: %s\n", strerror(errno));
855 856 857 858
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

859 860 861 862 863 864
# if defined(KERNEL_VERSION_PRE_2_6_30)
        /* SR: for kernel versions < 2.6.30 EFD_SEMAPHORE is not defined.
         * A read operation on the event fd will return the 8 byte value.
         */
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, 0);
# else
865
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
866
# endif
867
        if (itti_desc.threads[thread_id].task_event_fd == -1)
868
        {
869
            ITTI_ERROR(" eventfd failed: %s\n", strerror(errno));
870 871 872 873
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

874
        itti_desc.threads[thread_id].nb_events = 1;
875

876
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
877

878 879
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
880 881

        /* Add the event fd to the list of monitored events */
882 883
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
884
        {
885
            ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
886 887 888
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
889

890 891
        ITTI_DEBUG(" Successfully subscribed fd %d for thread %d\n",
                   itti_desc.threads[thread_id].task_event_fd, thread_id);
892

893 894 895
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
896
#endif
897
    }
898

899
    itti_desc.running = 1;
900 901 902
    itti_desc.wait_tasks = 0;
    itti_desc.created_tasks = 0;
    itti_desc.ready_tasks = 0;
903 904 905
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
906 907

    rt_global_heap_open();
908
#endif
909

910 911 912 913 914 915
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.vcd_poll_msg = 0;
    itti_desc.vcd_receive_msg = 0;
    itti_desc.vcd_send_msg = 0;
#endif

916
    itti_dump_init (messages_definition_xml, dump_file_name);
917

918
    CHECK_INIT_RETURN(timer_init ());
919 920 921 922

    return 0;
}

923 924
void itti_wait_tasks_end(void) {
    int end = 0;
925 926
    int thread_id;
    task_id_t task_id;
927 928 929
    int ready_tasks;
    int result;
    int retries = 10;
930 931 932 933 934 935 936 937

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

938 939 940
    do {
        ready_tasks = 0;

941 942
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
943
            /* Skip tasks which are not running */
944 945 946 947 948
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
949

950
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
951

952
                ITTI_DEBUG(" Thread %s join status %d\n", itti_get_task_name(task_id), result);
953 954 955

                if (result == 0) {
                    /* Thread has terminated */
956
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
957 958 959 960 961 962 963 964 965
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
966
        }
967 968
    } while ((ready_tasks > 0) && (retries--));

969
    itti_desc.running = 0;
970

971
    if (ready_tasks > 0) {
972
        ITTI_DEBUG(" Some threads are still running, force exit\n");
973
        exit (0);
974
    }
975 976

    itti_dump_exit();
977 978 979
}

void itti_send_terminate_message(task_id_t task_id) {
980 981
    MessageDef *terminate_message_p;

982
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
983

984
    itti_send_broadcast_message (terminate_message_p);
985
}