intertask_interface.c 34.6 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30

31
#define _GNU_SOURCE
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
39

40 41
#include <sys/epoll.h>
#include <sys/eventfd.h>
42

43 44 45 46
#ifdef RTAI
# include <rtai_shm.h>
#endif

47 48 49 50 51 52
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

53
#if defined(OAI_EMU) || defined(RTAI)
54
# include "memory_pools.h"
55 56 57
# include "vcd_signal_dumper.h"
#endif

58 59 60
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
61 62 63 64
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

65
#include "signals.h"
66 67
#include "timer.h"

68 69 70 71 72 73 74 75 76 77
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS;
78

79 80
/* Don't flush if using RTAI */
#ifdef RTAI
81
# define ITTI_DEBUG(m, x, args...) do { if ((m) & itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
82 83 84 85
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
86
# define ITTI_DEBUG(m, x, args...) do { if ((m) & itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
87
    while(0)
88
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
89
    while(0)
90
#endif
91 92 93 94

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

95
#ifndef EFD_SEMAPHORE
96 97 98
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

99 100 101 102 103
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

104
typedef enum task_state_s {
105
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
106 107 108
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
109
typedef struct message_list_s {
110
    MessageDef *msg; ///< Pointer to the message
111

112 113
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
114
} message_list_t;
115

116 117 118
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
119

120 121
    /* State of the thread */
    volatile task_state_t task_state;
122 123 124 125

    /* This fd is used internally by ITTI. */
    int epoll_fd;

126
    /* The thread fd */
127 128 129 130 131
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

132
#if defined(KERNEL_VERSION_PRE_2_6_30)
133
    eventfd_t sem_counter;
134 135
#endif

136 137 138 139 140 141
    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
142 143

    int epoll_nb_events;
144 145 146 147 148 149 150

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
151
#endif
152 153 154 155 156
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
157 158
} task_desc_t;

159
typedef struct itti_desc_s {
160
    thread_desc_t *threads;
161
    task_desc_t   *tasks;
162

163
    /* Current message number. Incremented every call to send_msg_to_task */
164
    message_number_t message_number __attribute__((aligned(8)));
165 166

    thread_id_t thread_max;
167
    task_id_t task_max;
168 169
    MessagesIds messages_id_max;

170 171
    pthread_t thread_handling_signals;

172
    const task_info_t *tasks_info;
173 174
    const message_info_t *messages_info;

175
    itti_lte_time_t lte_time;
176 177

    int running;
178 179 180 181

    volatile uint32_t created_tasks;
    volatile uint32_t ready_tasks;
    volatile int      wait_tasks;
182 183 184
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
185 186

#if defined(OAI_EMU) || defined(RTAI)
187 188
    memory_pools_handle_t memory_pools_handle;

189 190 191 192
    uint64_t vcd_poll_msg;
    uint64_t vcd_receive_msg;
    uint64_t vcd_send_msg;
#endif
193 194 195
} itti_desc_t;

static itti_desc_t itti_desc;
196

197
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
198 199 200
{
    void *ptr = NULL;

201 202
#if defined(OAI_EMU) || defined(RTAI)
    ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);
203
#else
204
    ptr = malloc (size);
205 206
#endif

207 208 209 210 211 212
    DevCheck(ptr != NULL, size, origin_task_id, destination_task_id);
#if defined(OAI_EMU) || defined(RTAI)
    if (ptr == NULL)
    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

213
        ITTI_ERROR (" Memory pools statistics:\n%s", statistics);
214 215 216
        free (statistics);
    }
#endif
217 218 219 220 221 222 223

    return ptr;
}

void itti_free(task_id_t task_id, void *ptr)
{
    DevAssert(ptr != NULL);
224 225 226

#if defined(OAI_EMU) || defined(RTAI)
    memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);
227
#else
228
    free (ptr);
229 230 231
#endif
}

232
static inline message_number_t itti_increment_message_number(void) {
233 234 235 236
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
237
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
238 239
}

240
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
241 242 243 244 245
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

246
const char *itti_get_message_name(MessagesIds message_id) {
247 248 249 250 251
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

252
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
253
{
254
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
255

256
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
257 258
}

259
static task_id_t itti_get_current_task_id(void)
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

277 278 279 280 281 282
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

283
int itti_send_broadcast_message(MessageDef *message_p) {
284
    task_id_t destination_task_id;
285
    task_id_t origin_task_id;
286
    thread_id_t origin_thread_id;
287
    uint32_t thread_id;
288
    int ret = 0;
289
    int result;
290

291
    DevAssert(message_p != NULL);
292

293 294
    origin_task_id = message_p->ittiMsgHeader.originTaskId;
    origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
295

296 297
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
298 299
        MessageDef *new_message_p;

300 301 302 303
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
304
        /* Skip task that broadcast the message */
305
        if (thread_id != origin_thread_id) {
306
            /* Skip tasks which are not running */
307
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
308
                new_message_p = itti_malloc (origin_task_id, destination_task_id, sizeof(MessageDef));
309
                DevAssert(message_p != NULL);
310 311

                memcpy (new_message_p, message_p, sizeof(MessageDef));
312
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
313
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
314
            }
315 316
        }
    }
317
    itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
318 319 320 321

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
322 323
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
324 325
    MessageDef *temp = NULL;

326
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
327

328 329 330 331
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

332 333 334 335 336 337
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

338
    temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
339
    DevAssert(temp != NULL);
340

Cedric Roux's avatar
Cedric Roux committed
341 342 343
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
344

345 346 347 348
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

349 350 351
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
352 353 354 355 356
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

357
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
358
{
359
    thread_id_t destination_thread_id;
360
    thread_id_t origin_task_id;
361
    message_list_t *new;
362 363 364
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
365

winckel's avatar
winckel committed
366
#if defined(OAI_EMU) || defined(RTAI)
367 368
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
369 370
#endif

371
    DevAssert(message != NULL);
372
    DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
373

374 375
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
376 377 378 379
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
380 381
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

382 383
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

384
    priority = itti_get_message_priority (message_id);
385

386 387
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
388

389
    itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
390 391 392
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
393
    {
394
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
395 396 397
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

398 399
        if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
        {
400
            ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
401 402 403 404 405 406 407 408 409 410 411 412
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
        else
        {
            /* We cannot send a message if the task is not running */
            DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, destination_thread_id,
                     itti_desc.threads[destination_thread_id].task_state, message_id);
413

414
            /* Allocate new list element */
415
            new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));
416
            DevAssert(new != NULL);
417

418 419 420 421
            /* Fill in members */
            new->msg = message;
            new->message_number = message_number;
            new->message_priority = priority;
422

423 424
            /* Enqueue message in destination task queue */
            lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
425

426
#if defined(OAI_EMU) || defined(RTAI)
427
            vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
428 429
#endif

430
#ifdef RTAI
431 432 433 434 435 436
            if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
            {
                /* This is a RT task, increase destination task messages pending counter */
                __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
            }
            else
437 438
#endif
            {
439 440 441 442
                /* Only use event fd for tasks, subtasks will pool the queue */
                if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
                {
                    ssize_t write_ret;
443
                    eventfd_t sem_counter = 1;
444

445 446 447 448
                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
                }
449
            }
450

451
            ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
452 453 454 455 456 457 458
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
Cedric Roux's avatar
Cedric Roux committed
459 460 461
    } else {
        /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
        itti_free(origin_task_id, message);
462
    }
463

464
#if defined(OAI_EMU) || defined(RTAI)
465 466
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
467 468
#endif

469 470 471
    return 0;
}

472 473
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
474
    thread_id_t thread_id;
475 476
    struct epoll_event event;

477
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
478 479
    DevCheck(fd >= 0, fd, 0, 0);

480 481
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
482 483

    /* Reallocate the events */
484 485 486
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
487

488
    event.events  = EPOLLIN | EPOLLERR;
Cedric Roux's avatar
Cedric Roux committed
489 490
    event.data.u64 = 0;
    event.data.fd  = fd;
491 492

    /* Add the event fd to the list of monitored events */
493
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
494 495
        &event) != 0)
    {
496
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
497 498 499 500
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
501

502
    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
503 504 505 506
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
507 508
    thread_id_t thread_id;

509
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
510 511
    DevCheck(fd >= 0, fd, 0, 0);

512
    thread_id = TASK_GET_THREAD_ID(task_id);
513
    /* Add the event fd to the list of monitored events */
514
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
515
    {
516
        ITTI_ERROR(" epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
517 518 519 520 521
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

522 523 524 525
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
526 527 528 529
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
530 531
    thread_id_t thread_id;

532
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
533

534 535
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
536

537
    return itti_desc.threads[thread_id].epoll_nb_events;
538 539
}

540 541
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
542
    thread_id_t thread_id;
543 544
    int epoll_ret = 0;
    int epoll_timeout = 0;
545
    int i;
546

547
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
548 549
    DevAssert(received_msg != NULL);

550
    thread_id = TASK_GET_THREAD_ID(task_id);
551 552 553 554 555 556 557 558
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
559
        /* timeout = -1 causes the epoll_wait to wait indefinitely.
560 561 562 563
         */
        epoll_timeout = -1;
    }

564
    do {
565 566 567
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
568 569
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
570 571

    if (epoll_ret < 0) {
572
        ITTI_ERROR(" epoll_wait failed for task %s: %s\n",
573
                   itti_get_task_name(task_id), strerror(errno));
574 575 576 577 578 579 580
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

581
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
582

583
    for (i = 0; i < epoll_ret; i++) {
584
        /* Check if there is an event for ITTI for the event fd */
585 586
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
587
        {
588
            struct message_list_s *message = NULL;
589 590
            eventfd_t sem_counter;
            ssize_t   read_ret;
591 592

            /* Read will always return 1 */
593
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
594
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
595

596 597
#if defined(KERNEL_VERSION_PRE_2_6_30)
            /* Store the value of the semaphore counter */
598
            itti_desc.threads[task_id].sem_counter = sem_counter - 1;
599 600
#endif

601
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
602
                /* No element in list -> this should not happen */
603
                DevParam(task_id, epoll_ret, 0);
604
            }
605
            DevAssert(message != NULL);
606
            *received_msg = message->msg;
607
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
608 609
            /* Mark that the event has been processed */
            itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
610
            return;
611 612 613 614 615 616
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
617
#if defined(OAI_EMU) || defined(RTAI)
618 619
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
620
#endif
621 622 623

#if defined(KERNEL_VERSION_PRE_2_6_30)
    /* Store the value of the semaphore counter */
624 625 626
    if (itti_desc.threads[task_id].sem_counter > 0) {
        struct message_list_s *message = NULL;

627 628
        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
            /* No element in list -> this should not happen */
629
            DevParam(task_id, itti_desc.threads[task_id].sem_counter, 0);
630 631 632
        }
        DevAssert(message != NULL);
        *received_msg = message->msg;
633
        itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
634

635
        itti_desc.threads[task_id].sem_counter--;
636 637
    } else
#endif
638
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
639

winckel's avatar
winckel committed
640
#if defined(OAI_EMU) || defined(RTAI)
641 642
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
643
#endif
644 645
}

646 647
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
648
    DevAssert(received_msg != NULL);
649 650 651

    *received_msg = NULL;

652
#if defined(OAI_EMU) || defined(RTAI)
653 654
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
655 656 657 658 659 660 661 662
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
663
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
664 665
        }
    }
666

667 668
    if (*received_msg == NULL) {
        ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
669
    }
670 671

#if defined(OAI_EMU) || defined(RTAI)
672 673
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
674
#endif
675 676
}

677
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
678
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
679
    int result;
680 681

    DevAssert(start_routine != NULL);
682
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
683 684
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
685

686
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
687

688
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
689

690
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
691
    DevCheck(result >= 0, task_id, thread_id, result);
692

693 694
    itti_desc.created_tasks ++;

695
    /* Wait till the thread is completely ready */
696
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
697 698
        usleep (1000);

699 700 701
    return 0;
}

702 703 704 705 706 707 708 709 710 711 712
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

713 714 715 716
void itti_wait_ready(int wait_tasks)
{
    itti_desc.wait_tasks = wait_tasks;

717
    ITTI_DEBUG(ITTI_DEBUG_INIT, " wait for tasks: %s, created tasks %d, ready tasks %d\n", itti_desc.wait_tasks ? "yes" : "no",
718 719 720 721 722
        itti_desc.created_tasks, itti_desc.ready_tasks);

    DevCheck(itti_desc.created_tasks == itti_desc.ready_tasks, itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
}

723 724
void itti_mark_task_ready(task_id_t task_id)
{
725 726
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

727 728
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

729 730 731 732 733 734
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

735 736 737 738 739 740 741 742 743
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

744
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
745 746 747 748 749 750 751
    itti_desc.ready_tasks ++;

    while (itti_desc.wait_tasks != 0)
    {
        usleep (10000);
    }

752
    ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
753 754
}

755
void itti_exit_task(void) {
756 757 758 759 760 761 762 763 764
#if defined(OAI_EMU) || defined(RTAI)
    task_id_t task_id = itti_get_current_task_id();

    if (task_id > TASK_UNKNOWN)
    {
        vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                                __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
    }
#endif
765 766 767
    pthread_exit (NULL);
}

768
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
769
    // Sends Terminate signals to all tasks.
770 771 772 773 774 775 776
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
777 778
}

779 780 781 782 783 784 785 786
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
787 788 789 790 791
        usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
#endif
792 793 794 795 796 797 798 799 800 801 802 803

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
804
                    eventfd_t sem_counter = pending_messages;
805 806 807 808 809 810 811

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
812 813 814 815

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
#endif
816 817 818 819 820
    }
    return NULL;
}
#endif

821
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
822
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
823 824
    task_id_t task_id;
    thread_id_t thread_id;
825 826
    int ret;

827
    itti_desc.message_number = 1;
828

829
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
830

831
    CHECK_INIT_RETURN(signal_mask());
832

833
    /* Saves threads and messages max values */
834
    itti_desc.task_max = task_max;
835 836
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
837
    itti_desc.thread_handling_signals = -1;
838
    itti_desc.tasks_info = tasks_info;
839 840 841
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
842 843 844 845
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
846 847

    /* Initializing each queue and related stuff */
848
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
849
    {
850
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
851 852 853 854 855 856
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

857
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
858 859 860

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
861
        {
862
            ITTI_ERROR(" lfds611_queue_new failed for task %u\n", task_id);
863 864
            DevAssert(0 == 1);
        }
865 866 867 868 869 870
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
871

872 873
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
874
            ITTI_ERROR(" Failed to create new epoll fd: %s\n", strerror(errno));
875 876 877 878
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

879 880 881 882 883 884
# if defined(KERNEL_VERSION_PRE_2_6_30)
        /* SR: for kernel versions < 2.6.30 EFD_SEMAPHORE is not defined.
         * A read operation on the event fd will return the 8 byte value.
         */
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, 0);
# else
885
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
886
# endif
887
        if (itti_desc.threads[thread_id].task_event_fd == -1)
888
        {
889
            ITTI_ERROR(" eventfd failed: %s\n", strerror(errno));
890 891 892 893
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

894
        itti_desc.threads[thread_id].nb_events = 1;
895

896
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
897

898 899
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
900 901

        /* Add the event fd to the list of monitored events */
902 903
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
904
        {
905
            ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
906 907 908
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
909

910
        ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
911
                   itti_desc.threads[thread_id].task_event_fd, thread_id);
912

913 914 915
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
916
#endif
917
    }
918

919
    itti_desc.running = 1;
920 921 922
    itti_desc.wait_tasks = 0;
    itti_desc.created_tasks = 0;
    itti_desc.ready_tasks = 0;
923 924 925
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
926 927

    rt_global_heap_open();
928
#endif
929

930 931 932 933 934 935 936 937 938 939
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.memory_pools_handle = memory_pools_create (4);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS,       50);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000,                                 1000);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000,                                 10000);

    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

940
        ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
941 942 943 944
        free (statistics);
    }
#endif

945 946 947 948 949 950
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.vcd_poll_msg = 0;
    itti_desc.vcd_receive_msg = 0;
    itti_desc.vcd_send_msg = 0;
#endif

951
    itti_dump_init (messages_definition_xml, dump_file_name);
952

953
    CHECK_INIT_RETURN(timer_init ());
954 955 956 957

    return 0;
}

958 959
void itti_wait_tasks_end(void) {
    int end = 0;
960 961
    int thread_id;
    task_id_t task_id;
962 963 964
    int ready_tasks;
    int result;
    int retries = 10;
965 966 967 968 969 970 971 972

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

973 974 975
    do {
        ready_tasks = 0;

976 977
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
978
            /* Skip tasks which are not running */
979 980 981 982 983
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
984

985
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
986

987
                ITTI_DEBUG(ITTI_DEBUG_EXIT, " Thread %s join status %d\n", itti_get_task_name(task_id), result);
988 989 990

                if (result == 0) {
                    /* Thread has terminated */