intertask_interface.c 30.3 KB
Newer Older
Cedric Roux's avatar
 
Cedric Roux committed
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
Cedric Roux's avatar
 
Cedric Roux committed
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
Cedric Roux's avatar
 
Cedric Roux committed
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
Cedric Roux's avatar
 
Cedric Roux committed
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Cedric Roux's avatar
 
Cedric Roux committed
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
Cedric Roux's avatar
 
Cedric Roux committed
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
Cedric Roux's avatar
 
Cedric Roux committed
28

29
 *******************************************************************************/
Cedric Roux's avatar
 
Cedric Roux committed
30

31
#define _GNU_SOURCE
Cedric Roux's avatar
 
Cedric Roux committed
32 33 34 35 36 37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
Cedric Roux's avatar
 
Cedric Roux committed
39

40 41 42 43
#ifdef RTAI
# include <rtai_fifos.h>
#endif

Cedric Roux's avatar
 
Cedric Roux committed
44 45 46
#include "queue.h"
#include "assertions.h"

47 48 49 50 51 52
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
# include <sys/eventfd.h>
# include "liblfds611.h"
#endif

Cedric Roux's avatar
 
Cedric Roux committed
53 54
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
55

56 57 58 59
#if defined(OAI_EMU) || defined(RTAI)
# include "vcd_signal_dumper.h"
#endif

60 61 62
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
 
Cedric Roux committed
63 64 65 66
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

67
#include "signals.h"
Cedric Roux's avatar
 
Cedric Roux committed
68 69
#include "timer.h"

70 71
const int itti_debug = 0;
const int itti_debug_poll = 0;
Cedric Roux's avatar
 
Cedric Roux committed
72

73 74 75 76 77 78 79 80
/* Don't flush if using RTAI */
#ifdef RTAI
# define ITTI_DEBUG(x, args...) do { if (itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
# define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
81
    while(0)
82
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
83
    while(0)
84
#endif
Cedric Roux's avatar
 
Cedric Roux committed
85 86 87 88 89

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
90
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
 
Cedric Roux committed
91 92 93
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
94
typedef struct message_list_s {
95
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
96
    STAILQ_ENTRY(message_list_s) next_element;
97
#endif
Cedric Roux's avatar
 
Cedric Roux committed
98

99
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
 
Cedric Roux committed
100

101 102
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
103
} message_list_t;
Cedric Roux's avatar
 
Cedric Roux committed
104

105 106 107
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
108

109 110 111 112
    /* State of the thread */
    volatile task_state_t task_state;
} thread_desc_t;

Cedric Roux's avatar
 
Cedric Roux committed
113 114
typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
115
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
Cedric Roux committed
116
    STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
Cedric Roux's avatar
 
Cedric Roux committed
117 118

    /* Number of messages in the queue */
119
    volatile uint32_t message_in_queue;
Cedric Roux's avatar
 
Cedric Roux committed
120
    /* Mutex for the message queue */
121
    pthread_mutex_t message_queue_mutex;
Cedric Roux's avatar
 
Cedric Roux committed
122
    /* Conditional var for message queue and task synchro */
123
    pthread_cond_t message_queue_cond_var;
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
#else
    struct lfds611_queue_state *message_queue;

    /* This fd is used internally by ITTI. */
    int epoll_fd;

    /* The task fd */
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
142 143

    int epoll_nb_events;
144
#endif
Cedric Roux's avatar
 
Cedric Roux committed
145 146
} task_desc_t;

147
typedef struct itti_desc_s {
148
    thread_desc_t *threads;
Cedric Roux's avatar
 
Cedric Roux committed
149
    task_desc_t *tasks;
150

Cedric Roux's avatar
 
Cedric Roux committed
151
    /* Current message number. Incremented every call to send_msg_to_task */
152
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
 
Cedric Roux committed
153 154

    thread_id_t thread_max;
155
    task_id_t task_max;
Cedric Roux's avatar
 
Cedric Roux committed
156 157
    MessagesIds messages_id_max;

158 159
    pthread_t thread_handling_signals;

160
    const task_info_t *tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
161 162
    const message_info_t *messages_info;

163 164 165 166
    itti_lte_time_t lte_time;
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
 
Cedric Roux committed
167

168
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
 
Cedric Roux committed
169 170 171 172
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
173
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
 
Cedric Roux committed
174 175
}

176
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
177 178 179 180 181
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

182
const char *itti_get_message_name(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
183 184 185 186 187
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

188
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
189
{
190
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
191

192
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
193 194
}

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
static task_id_t itti_get_current_task_id()
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

213 214 215 216 217 218
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

219
int itti_send_broadcast_message(MessageDef *message_p) {
220
    task_id_t destination_task_id;
221
    thread_id_t origin_thread_id;
222
    uint32_t thread_id;
Cedric Roux's avatar
 
Cedric Roux committed
223
    int ret = 0;
224
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
225

226
    DevAssert(message_p != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
227

Cedric Roux's avatar
Cedric Roux committed
228
    origin_thread_id = TASK_GET_THREAD_ID(message_p->ittiMsgHeader.originTaskId);
Cedric Roux's avatar
 
Cedric Roux committed
229

230 231
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
232 233
        MessageDef *new_message_p;

234 235 236 237
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
238
        /* Skip task that broadcast the message */
239
        if (thread_id != origin_thread_id) {
240
            /* Skip tasks which are not running */
241
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
242
                new_message_p = malloc (sizeof(MessageDef));
243
                DevAssert(message_p != NULL);
244 245

                memcpy (new_message_p, message_p, sizeof(MessageDef));
246
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
247
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
248
            }
Cedric Roux's avatar
 
Cedric Roux committed
249 250
        }
    }
251
    free (message_p);
Cedric Roux's avatar
 
Cedric Roux committed
252 253 254 255

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
256 257
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
Cedric Roux's avatar
 
Cedric Roux committed
258 259
    MessageDef *temp = NULL;

260
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
261

262 263 264 265 266 267
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

Cedric Roux's avatar
Cedric Roux committed
268
    temp = calloc (1, sizeof(MessageHeader) + size);
269
    DevAssert(temp != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
270

Cedric Roux's avatar
Cedric Roux committed
271 272 273
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
 
Cedric Roux committed
274 275 276 277

    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
278 279 280 281 282 283 284
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message)
{
285
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
286
    thread_id_t origin_task_id;
287
    message_list_t *new;
288 289 290
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
 
Cedric Roux committed
291

292
    DevAssert(message != NULL);
293
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
294

Cedric Roux's avatar
Cedric Roux committed
295 296 297 298 299
    message->ittiMsgHeader.destinationTaskId = task_id;
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
Cedric Roux's avatar
 
Cedric Roux committed
300 301
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

302 303 304 305 306 307 308
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            task_id);
#endif

309
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
 
Cedric Roux committed
310

311 312
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
 
Cedric Roux committed
313

314 315 316 317 318 319 320 321 322 323 324 325 326
#ifdef RTAI
    if (pthread_self() != itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread)
#endif
        itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
                                 sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
               itti_desc.messages_info[message_id].name,
               message_number,
               priority,
               itti_get_task_name(origin_task_id),
               task_id,
               itti_get_task_name(task_id));
327

328 329 330 331 332
    if (task_id != TASK_UNKNOWN)
    {
        /* We cannot send a message if the task is not running */
        DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state,
                 TASK_STATE_READY, thread_id);
Cedric Roux's avatar
 
Cedric Roux committed
333

334 335 336
#if !defined(ENABLE_EVENT_FD)
        /* Lock the mutex to get exclusive access to the list */
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
337

338 339 340 341
        /* Check the number of messages in the queue */
        DevCheck(itti_desc.tasks[task_id].message_in_queue < itti_desc.tasks_info[task_id].queue_size,
                 task_id, itti_desc.tasks[task_id].message_in_queue, itti_desc.tasks_info[task_id].queue_size);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
342

343
        /* Allocate new list element */
344
        new = (message_list_t *) malloc (sizeof(struct message_list_s));
345
        DevAssert(new != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
346

347 348 349 350
        /* Fill in members */
        new->msg = message;
        new->message_number = message_number;
        new->message_priority = priority;
Cedric Roux's avatar
 
Cedric Roux committed
351

352
#if defined(ENABLE_EVENT_FD)
353 354 355 356 357 358 359 360 361 362 363 364 365
# ifdef RTAI
        if ((pthread_self() != itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) &&
            (TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN))
        {
            /* This is the RT task, -> enqueue in the parent thread */
            lfds611_queue_enqueue(itti_desc.tasks[TASK_GET_PARENT_TASK_ID(origin_task_id)].message_queue, new);

            /* Signal from RT thread */
//             rtf_sem_post(56);
        } else
# endif
        /* No need to use a event fd for subtasks */
        if (TASK_GET_PARENT_TASK_ID(task_id) == TASK_UNKNOWN)
366
        {
367
            ssize_t  write_ret;
368
            uint64_t sem_counter = 1;
369

370
            lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
371

372
            /* Call to write for an event fd must be of 8 bytes */
373
            write_ret = write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
374 375 376
            DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, task_id);
        } else {
            lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
377
        }
378
#else
379 380 381 382
        if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
            STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element);
        }
        else {
Cedric Roux's avatar
 
Cedric Roux committed
383 384 385 386
//         struct message_list_s *insert_after = NULL;
//         struct message_list_s *temp;
// 
//         /* This method is inefficient... */
387
//         STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) {
Cedric Roux's avatar
 
Cedric Roux committed
388 389 390 391 392 393 394 395 396 397 398 399 400
//             struct message_list_s *next;
//             next = STAILQ_NEXT(temp, next_element);
//             /* Increment message priority to create a sort of
//              * priority based scheduler */
// //             if (temp->message_priority < TASK_PRIORITY_MAX) {
// //                 temp->message_priority++;
// //             }
//             if (next && next->message_priority < priority) {
//                 insert_after = temp;
//                 break;
//             }
//         }
//         if (insert_after == NULL) {
401
        STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element);
Cedric Roux's avatar
 
Cedric Roux committed
402
//         } else {
403
//             STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new,
Cedric Roux's avatar
 
Cedric Roux committed
404 405
//                                 next_element);
//         }
406
        }
Cedric Roux's avatar
 
Cedric Roux committed
407

408 409 410 411 412 413 414 415
        /* Update the number of messages in the queue */
        itti_desc.tasks[task_id].message_in_queue++;
        if (itti_desc.tasks[task_id].message_in_queue == 1) {
            /* Emit a signal to wake up target task thread */
            pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var);
        }
        /* Release the mutex */
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
416
#endif
417
    }
418

419 420 421 422 423
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG_END,
                                            task_id);
#endif

Cedric Roux's avatar
 
Cedric Roux committed
424 425 426
    return 0;
}

427
#if defined(ENABLE_EVENT_FD)
428 429 430 431
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
    struct epoll_event event;

432
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
433 434
    DevCheck(fd >= 0, fd, 0, 0);

435
    itti_desc.tasks[task_id].nb_events++;
436 437

    /* Reallocate the events */
438 439 440
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
441

442
    event.events  = EPOLLIN | EPOLLERR;
443 444 445
    event.data.fd = fd;

    /* Add the event fd to the list of monitored events */
446
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd,
447 448 449 450 451 452 453
        &event) != 0)
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
454 455

    ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
456 457 458 459
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
460
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
461 462 463
    DevCheck(fd >= 0, fd, 0, 0);

    /* Add the event fd to the list of monitored events */
464
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
465 466 467 468 469 470 471
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

472 473 474 475
    itti_desc.tasks[task_id].nb_events--;
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
476 477 478 479
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
480
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
481

482
    *events = itti_desc.tasks[task_id].events;
483

484
    return itti_desc.tasks[task_id].epoll_nb_events;
485 486
}

487 488 489 490
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
    int epoll_ret = 0;
    int epoll_timeout = 0;
491
    int i;
492

493
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
    DevAssert(received_msg != NULL);

    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
        /* timeout = -1 causes the epoll_wait to wait indefinetely.
         */
        epoll_timeout = -1;
    }

509
    do {
510 511 512
        epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd,
                               itti_desc.tasks[task_id].events,
                               itti_desc.tasks[task_id].nb_events,
513 514
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
515 516 517

    if (epoll_ret < 0) {
        ITTI_ERROR("epoll_wait failed for task %s: %s\n",
518
                   itti_get_task_name(task_id), strerror(errno));
519 520 521 522 523 524 525
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

526
    itti_desc.tasks[task_id].epoll_nb_events = epoll_ret;
527

528
    for (i = 0; i < epoll_ret; i++) {
529
        /* Check if there is an event for ITTI for the event fd */
530 531
        if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) &&
            (itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd))
532
        {
533 534
            struct message_list_s *message;
            uint64_t sem_counter;
535
            ssize_t  read_ret;
536 537

            /* Read will always return 1 */
538 539
            read_ret = read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
540

541
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
542
                /* No element in list -> this should not happen */
543
                DevParam(task_id, epoll_ret, 0);
544 545
            }
            *received_msg = message->msg;
546
            free (message);
547
            return;
548 549 550 551 552 553 554
        }
    }
}
#endif

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
555 556 557 558
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            task_id);
#endif
559 560 561
#if defined(ENABLE_EVENT_FD)
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
#else
562
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
563 564 565
    DevAssert(received_msg != NULL);

    // Lock the mutex to get exclusive access to the list
566
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
567

568 569
    if (itti_desc.tasks[task_id].message_in_queue == 0) {
        ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
570
        // Wait while list == 0
571 572 573 574
        pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var,
                           &itti_desc.tasks[task_id].message_queue_mutex);
        ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n",
                   task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
575 576
    }

577
    if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
578
        message_list_t *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue);
Cedric Roux's avatar
 
Cedric Roux committed
579 580 581 582 583

        /* Update received_msg reference */
        *received_msg = temp->msg;

        /* Remove message from queue */
584
        STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element);
585
        free (temp);
586
        itti_desc.tasks[task_id].message_in_queue--;
Cedric Roux's avatar
 
Cedric Roux committed
587 588
    }
    // Release the mutex
589
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
590
#endif
591 592 593 594
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG_END,
                                            task_id);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
595 596
}

597 598
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
599
    DevAssert(received_msg != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
600 601 602

    *received_msg = NULL;

603 604 605 606 607
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            task_id);
#endif

608
#if defined(ENABLE_EVENT_FD)
609 610 611 612 613 614 615 616 617
    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
            free (message);
        }
    }
618
#else
619
    if (itti_desc.tasks[task_id].message_in_queue != 0) {
620
        message_list_t *temp;
Cedric Roux's avatar
 
Cedric Roux committed
621 622

        // Lock the mutex to get exclusive access to the list
623
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
624

625
        STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element)
Cedric Roux's avatar
 
Cedric Roux committed
626
        {
627 628 629 630 631 632 633 634 635
            /* Update received_msg reference */
            *received_msg = temp->msg;

            /* Remove message from queue */
            STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element);
            free (temp);
            itti_desc.tasks[task_id].message_in_queue--;

            ITTI_DEBUG(
Cedric Roux's avatar
Cedric Roux committed
636 637
                       "Receiver queue[(%u:%s)] got new message %s, number %lu\n",
                       task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->ittiMsgHeader.messageId].name, temp->message_number);
638
            break;
Cedric Roux's avatar
 
Cedric Roux committed
639 640 641
        }

        // Release the mutex
642
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
643
    }
644
#endif
Cedric Roux's avatar
 
Cedric Roux committed
645

646
    if ((itti_debug_poll) && (*received_msg == NULL)) {
647
        ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
648
    }
649 650 651 652 653

#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG_END,
                                            task_id);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
654 655
}

656
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
 
Cedric Roux committed
657
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
658
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
659 660

    DevAssert(start_routine != NULL);
661
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
662 663
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
 
Cedric Roux committed
664

665
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
 
Cedric Roux committed
666

667 668
    ITTI_DEBUG("Create thread for task %s\n", itti_get_task_name(task_id));

669
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
670
    DevCheck(result >= 0, task_id, thread_id, result);
Cedric Roux's avatar
 
Cedric Roux committed
671 672

    /* Wait till the thread is completely ready */
673
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
674
        ;
Cedric Roux's avatar
 
Cedric Roux committed
675 676 677
    return 0;
}

678 679
void itti_mark_task_ready(task_id_t task_id)
{
Cedric Roux's avatar
 
Cedric Roux committed
680 681
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

682 683
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

684 685 686 687 688 689 690 691 692
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

693 694 695
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

696 697 698 699
#if defined(ENABLE_EVENT_FD)
    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
#else
Cedric Roux's avatar
 
Cedric Roux committed
700
    // Lock the mutex to get exclusive access to the list
701
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
702 703
#endif

704
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
705

706
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
707
    // Release the mutex
708
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
709
#endif
710 711
}

712 713 714 715
void itti_exit_task(void) {
    pthread_exit (NULL);
}

716
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
717
    // Sends Terminate signals to all tasks.
718 719 720 721 722 723 724
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
 
Cedric Roux committed
725 726
}

727
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
728
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
729 730
    task_id_t task_id;
    thread_id_t thread_id;
731 732
    int ret;

733
    itti_desc.message_number = 1;
Cedric Roux's avatar
 
Cedric Roux committed
734

735
    ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
736

737
#if !defined(RTAI)
738 739 740
    /* SR: disable signals module for RTAI (need to harmonize management
     * between softmodem and oaisim).
     */
741
    CHECK_INIT_RETURN(signal_init());
742
#endif
743

Cedric Roux's avatar
 
Cedric Roux committed
744
    /* Saves threads and messages max values */
745
    itti_desc.task_max = task_max;
Cedric Roux's avatar
 
Cedric Roux committed
746 747
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
748
    itti_desc.thread_handling_signals = -1;
749
    itti_desc.tasks_info = tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
750 751 752
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
753 754 755 756
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
Cedric Roux's avatar
 
Cedric Roux committed
757 758

    /* Initializing each queue and related stuff */
759
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
760
    {
761 762 763 764 765 766 767
        ITTI_DEBUG("Initializing %stask %s%s%s\n",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

768
#if defined(ENABLE_EVENT_FD)
769
        ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
770 771 772

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
773
        {
774
            ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id);
775 776 777
            DevAssert(0 == 1);
        }

778 779 780 781 782 783 784
# ifdef RTAI
        if (task_id == TASK_L2L1)
        {
            ret = rtf_sem_init(56, 0);
        }
# endif

785 786
        itti_desc.tasks[task_id].epoll_fd = epoll_create1(0);
        if (itti_desc.tasks[task_id].epoll_fd == -1) {
787 788 789 790 791
            ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

792
        itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
793 794
        if (itti_desc.tasks[task_id].task_event_fd == -1)
        {
795 796 797 798 799
            ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

800
        itti_desc.tasks[task_id].nb_events = 1;
801

802
        itti_desc.tasks[task_id].events = calloc(1, sizeof(struct epoll_event));
803

804
        itti_desc.tasks[task_id].events->events  = EPOLLIN | EPOLLERR;
805
        itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd;
806 807

        /* Add the event fd to the list of monitored events */
808 809
        if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0)
810
        {
811
            ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
812 813 814
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
815 816 817

        ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
                   itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id));
818
#else
819 820
        STAILQ_INIT (&itti_desc.tasks[task_id].message_queue);
        itti_desc.tasks[task_id].message_in_queue = 0;
821

Cedric Roux's avatar
 
Cedric Roux committed
822
        // Initialize mutexes
823
        pthread_mutex_init (&itti_desc.tasks[task_id].message_queue_mutex, NULL);
824

Cedric Roux's avatar
 
Cedric Roux committed
825
        // Initialize Cond vars
826
        pthread_cond_init (&itti_desc.tasks[task_id].message_queue_cond_var, NULL);
827
#endif
828
    }
829

830
    /* Initializing each thread */
831
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
832
    {
833
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
Cedric Roux's avatar
 
Cedric Roux committed
834
    }
835

836
    itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
 
Cedric Roux committed
837

838 839 840
#ifndef RTAI
     CHECK_INIT_RETURN(timer_init ());
#endif
Cedric Roux's avatar
 
Cedric Roux committed
841 842 843 844

    return 0;
}

845 846
void itti_wait_tasks_end(void) {
    int end = 0;
847 848
    int thread_id;
    task_id_t task_id;
849 850 851
    int ready_tasks;
    int result;
    int retries = 10;
852 853 854 855 856 857 858 859

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

860 861 862
    do {
        ready_tasks = 0;

863 864
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
865
            /* Skip tasks which are not running */
866 867 868 869 870
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }