intertask_interface.c 35.8 KB
Newer Older
Cedric Roux's avatar
   
Cedric Roux committed
1
2
/*******************************************************************************

3
4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
Cedric Roux's avatar
   
Cedric Roux committed
5

6
7
8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
Cedric Roux's avatar
   
Cedric Roux committed
9

10
11
12
13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
Cedric Roux's avatar
   
Cedric Roux committed
14

15
16
17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Cedric Roux's avatar
   
Cedric Roux committed
18

19
20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
Cedric Roux's avatar
   
Cedric Roux committed
21

22
23
24
25
26
27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
Cedric Roux's avatar
   
Cedric Roux committed
28

29
 *******************************************************************************/
Cedric Roux's avatar
   
Cedric Roux committed
30

31
#define _GNU_SOURCE
Cedric Roux's avatar
   
Cedric Roux committed
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
Cedric Roux's avatar
   
Cedric Roux committed
39

40
41
#include <sys/epoll.h>
#include <sys/eventfd.h>
42

43
44
45
46
#ifdef RTAI
# include <rtai_shm.h>
#endif

47
48
49
50
51
52
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

53
#if defined(OAI_EMU) || defined(RTAI)
54
# include "memory_pools.h"
55
56
57
# include "vcd_signal_dumper.h"
#endif

58
59
60
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
   
Cedric Roux committed
61
62
63
64
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

65
#include "signals.h"
Cedric Roux's avatar
   
Cedric Roux committed
66
67
#include "timer.h"

68
69
70
71
72
73
74
75
76
77
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS;
Cedric Roux's avatar
   
Cedric Roux committed
78

79
80
/* Don't flush if using RTAI */
#ifdef RTAI
81
# define ITTI_DEBUG(m, x, args...) do { if ((m) & itti_debug) rt_printk("[ITTI][D]"x, ##args); } \
82
83
84
85
    while(0)
# define ITTI_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
    while(0)
#else
86
# define ITTI_DEBUG(m, x, args...) do { if ((m) & itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
   
Cedric Roux committed
87
    while(0)
88
# define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
   
Cedric Roux committed
89
    while(0)
90
#endif
Cedric Roux's avatar
   
Cedric Roux committed
91
92
93
94

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

Cedric Roux's avatar
Cedric Roux committed
95
#ifndef EFD_SEMAPHORE
96
97
98
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

99
100
101
102
103
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

Cedric Roux's avatar
   
Cedric Roux committed
104
typedef enum task_state_s {
105
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
   
Cedric Roux committed
106
107
108
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
109
typedef struct message_list_s {
110
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
   
Cedric Roux committed
111

112
113
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
114
} message_list_t;
Cedric Roux's avatar
   
Cedric Roux committed
115

116
117
118
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
119

120
121
    /* State of the thread */
    volatile task_state_t task_state;
122
123
124
125

    /* This fd is used internally by ITTI. */
    int epoll_fd;

126
    /* The thread fd */
127
128
129
130
131
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

132
#if defined(KERNEL_VERSION_PRE_2_6_30)
Cedric Roux's avatar
Cedric Roux committed
133
    eventfd_t sem_counter;
134
135
#endif

136
137
138
139
140
141
    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
142
143

    int epoll_nb_events;
144
145
146
147
148
149
150

#ifdef RTAI
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
151
#endif
152
153
154
155
156
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
Cedric Roux's avatar
   
Cedric Roux committed
157
158
} task_desc_t;

159
typedef struct itti_desc_s {
160
    thread_desc_t *threads;
161
    task_desc_t   *tasks;
162

Cedric Roux's avatar
   
Cedric Roux committed
163
    /* Current message number. Incremented every call to send_msg_to_task */
164
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
   
Cedric Roux committed
165
166

    thread_id_t thread_max;
167
    task_id_t task_max;
Cedric Roux's avatar
   
Cedric Roux committed
168
169
    MessagesIds messages_id_max;

170
171
    pthread_t thread_handling_signals;

172
    const task_info_t *tasks_info;
Cedric Roux's avatar
   
Cedric Roux committed
173
174
    const message_info_t *messages_info;

175
    itti_lte_time_t lte_time;
176
177

    int running;
178
179
180
181

    volatile uint32_t created_tasks;
    volatile uint32_t ready_tasks;
    volatile int      wait_tasks;
182
183
184
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
185
186

#if defined(OAI_EMU) || defined(RTAI)
187
188
    memory_pools_handle_t memory_pools_handle;

189
190
191
192
    uint64_t vcd_poll_msg;
    uint64_t vcd_receive_msg;
    uint64_t vcd_send_msg;
#endif
193
194
195
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
   
Cedric Roux committed
196

197
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
198
199
200
{
    void *ptr = NULL;

201
202
203
204
205
206
#if defined(OAI_EMU) || defined(RTAI)
    ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);
    if (ptr == NULL)
    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

207
        ITTI_ERROR (" Memory pools statistics:\n%s", statistics);
208
209
        free (statistics);
    }
winckel's avatar
winckel committed
210
211
#else
    ptr = malloc (size);
212
#endif
winckel's avatar
winckel committed
213
214

    AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)\n", (int) size, origin_task_id, destination_task_id);
215
216
217
218
219
220

    return ptr;
}

void itti_free(task_id_t task_id, void *ptr)
{
winckel's avatar
winckel committed
221
    AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)\n", task_id);
222
223
224

#if defined(OAI_EMU) || defined(RTAI)
    memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);
225
#else
226
    free (ptr);
227
228
229
#endif
}

230
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
   
Cedric Roux committed
231
232
233
234
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
235
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
   
Cedric Roux committed
236
237
}

238
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
winckel's avatar
winckel committed
239
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
240
241
242
243

    return (itti_desc.messages_info[message_id].priority);
}

244
const char *itti_get_message_name(MessagesIds message_id) {
winckel's avatar
winckel committed
245
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
246
247
248
249

    return (itti_desc.messages_info[message_id].name);
}

250
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
251
{
winckel's avatar
winckel committed
252
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
Cedric Roux's avatar
Cedric Roux committed
253

254
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
255
256
}

257
static task_id_t itti_get_current_task_id(void)
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

275
276
277
278
279
280
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

281
int itti_send_broadcast_message(MessageDef *message_p) {
282
    task_id_t destination_task_id;
283
    task_id_t origin_task_id;
284
    thread_id_t origin_thread_id;
285
    uint32_t thread_id;
Cedric Roux's avatar
   
Cedric Roux committed
286
    int ret = 0;
287
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
288

winckel's avatar
winckel committed
289
    AssertFatal (message_p != NULL, "Trying to broadcast a NULL message\n");
Cedric Roux's avatar
   
Cedric Roux committed
290

291
292
    origin_task_id = message_p->ittiMsgHeader.originTaskId;
    origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
Cedric Roux's avatar
   
Cedric Roux committed
293

294
295
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
296
297
        MessageDef *new_message_p;

298
299
300
301
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
302
        /* Skip task that broadcast the message */
303
        if (thread_id != origin_thread_id) {
304
            /* Skip tasks which are not running */
305
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
306
                new_message_p = itti_malloc (origin_task_id, destination_task_id, sizeof(MessageDef));
winckel's avatar
winckel committed
307
                AssertFatal (new_message_p != NULL, "New message allocation failed\n");
308
309

                memcpy (new_message_p, message_p, sizeof(MessageDef));
310
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
winckel's avatar
winckel committed
311
                AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
312
            }
Cedric Roux's avatar
   
Cedric Roux committed
313
314
        }
    }
315
    itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
Cedric Roux's avatar
   
Cedric Roux committed
316
317
318
319

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
320
321
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
Cedric Roux's avatar
   
Cedric Roux committed
322
323
    MessageDef *temp = NULL;

winckel's avatar
winckel committed
324
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
325

326
327
328
329
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

330
331
332
333
334
335
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

336
    temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
Cedric Roux's avatar
   
Cedric Roux committed
337

Cedric Roux's avatar
Cedric Roux committed
338
339
340
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
   
Cedric Roux committed
341

342
343
344
345
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

Cedric Roux's avatar
   
Cedric Roux committed
346
347
348
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
349
350
351
352
353
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

354
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
355
{
356
    thread_id_t destination_thread_id;
357
    thread_id_t origin_task_id;
358
    message_list_t *new;
359
360
361
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
   
Cedric Roux committed
362

winckel's avatar
winckel committed
363
#if defined(OAI_EMU) || defined(RTAI)
364
365
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
366
367
#endif

winckel's avatar
winckel committed
368
369
    AssertFatal (message != NULL, "Message is NULL\n");
    AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
370

371
372
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
373
374
375
376
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
winckel's avatar
winckel committed
377
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
378

379
380
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

381
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
   
Cedric Roux committed
382

383
384
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
   
Cedric Roux committed
385

386
    itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
387
388
389
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
390
    {
391
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
392
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
winckel's avatar
winckel committed
393
394

        memory_pools_set_info (itti_desc.memory_pools_handle, message, 1, destination_task_id);
winckel's avatar
winckel committed
395
396
#endif

397
398
        if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
        {
399
            ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
400
401
402
403
404
405
406
407
408
409
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
        else
        {
            /* We cannot send a message if the task is not running */
winckel's avatar
winckel committed
410
411
            AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, "Cannot send message %d to thread %d, it is not in ready state (%d)\n",
                         message_id, destination_thread_id, itti_desc.threads[destination_thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
412

413
            /* Allocate new list element */
414
            new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));
Cedric Roux's avatar
   
Cedric Roux committed
415

416
417
418
419
            /* Fill in members */
            new->msg = message;
            new->message_number = message_number;
            new->message_priority = priority;
Cedric Roux's avatar
   
Cedric Roux committed
420

421
422
            /* Enqueue message in destination task queue */
            lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
423

424
#if defined(OAI_EMU) || defined(RTAI)
425
            vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
426
427
#endif

428
#ifdef RTAI
429
430
431
432
433
434
            if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
            {
                /* This is a RT task, increase destination task messages pending counter */
                __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
            }
            else
435
436
#endif
            {
437
438
439
440
                /* Only use event fd for tasks, subtasks will pool the queue */
                if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
                {
                    ssize_t write_ret;
Cedric Roux's avatar
Cedric Roux committed
441
                    eventfd_t sem_counter = 1;
442

443
444
                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
winckel's avatar
winckel committed
445
446
                    AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                                 destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
447
                }
448
            }
Cedric Roux's avatar
   
Cedric Roux committed
449

450
            ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
451
452
453
454
455
456
457
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
Cedric Roux's avatar
Cedric Roux committed
458
459
460
    } else {
        /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
        itti_free(origin_task_id, message);
461
    }
462

463
#if defined(OAI_EMU) || defined(RTAI)
464
465
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
466
467
#endif

Cedric Roux's avatar
   
Cedric Roux committed
468
469
470
    return 0;
}

471
472
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
473
    thread_id_t thread_id;
474
475
    struct epoll_event event;

winckel's avatar
winckel committed
476
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
477

478
479
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
480
481

    /* Reallocate the events */
482
483
484
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
485

486
    event.events  = EPOLLIN | EPOLLERR;
Cedric Roux's avatar
Cedric Roux committed
487
488
    event.data.u64 = 0;
    event.data.fd  = fd;
489
490

    /* Add the event fd to the list of monitored events */
491
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
492
493
494
        &event) != 0)
    {
        /* Always assert on this condition */
winckel's avatar
winckel committed
495
496
        AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                     itti_get_task_name(task_id), fd, strerror(errno));
497
    }
498

499
    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
500
501
502
503
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
504
505
    thread_id_t thread_id;

winckel's avatar
winckel committed
506
507
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
    AssertFatal (fd >= 0, "File descriptor (%d) is invalid\n", fd);
508

509
    thread_id = TASK_GET_THREAD_ID(task_id);
510
    /* Add the event fd to the list of monitored events */
511
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
512
513
    {
        /* Always assert on this condition */
winckel's avatar
winckel committed
514
515
        AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s\n",
                     itti_get_task_name(task_id), fd, strerror(errno));
516
517
    }

518
519
520
521
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
522
523
524
525
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
526
527
    thread_id_t thread_id;

winckel's avatar
winckel committed
528
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
529

530
531
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
532

533
    return itti_desc.threads[thread_id].epoll_nb_events;
534
535
}

536
537
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
538
    thread_id_t thread_id;
539
540
    int epoll_ret = 0;
    int epoll_timeout = 0;
541
    int i;
542

winckel's avatar
winckel committed
543
544
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
    AssertFatal (received_msg != NULL, "Received message is NULL\n");
545

546
    thread_id = TASK_GET_THREAD_ID(task_id);
547
548
549
550
551
552
553
554
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
555
        /* timeout = -1 causes the epoll_wait to wait indefinitely.
556
557
558
559
         */
        epoll_timeout = -1;
    }

560
    do {
561
562
563
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
564
565
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
566
567

    if (epoll_ret < 0) {
winckel's avatar
winckel committed
568
        AssertFatal (0, "epoll_wait failed for task %s: %s\n", itti_get_task_name(task_id), strerror(errno));
569
570
571
572
573
574
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

575
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
576

577
    for (i = 0; i < epoll_ret; i++) {
578
        /* Check if there is an event for ITTI for the event fd */
579
580
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
581
        {
582
            struct message_list_s *message = NULL;
Cedric Roux's avatar
Cedric Roux committed
583
584
            eventfd_t sem_counter;
            ssize_t   read_ret;
585
586

            /* Read will always return 1 */
587
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
winckel's avatar
winckel committed
588
            AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));
589

590
591
#if defined(KERNEL_VERSION_PRE_2_6_30)
            /* Store the value of the semaphore counter */
Cedric Roux's avatar
Cedric Roux committed
592
            itti_desc.threads[task_id].sem_counter = sem_counter - 1;
593
594
#endif

595
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
596
                /* No element in list -> this should not happen */
winckel's avatar
winckel committed
597
                AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue\n", task_id, epoll_ret);
598
            }
winckel's avatar
winckel committed
599
            AssertFatal(message != NULL, "Message from message queue is NULL\n");
600
            *received_msg = message->msg;
601
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
602
603
            /* Mark that the event has been processed */
            itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
604
            return;
605
606
607
608
609
610
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
611
#if defined(OAI_EMU) || defined(RTAI)
612
613
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
614
#endif
615
616
617

#if defined(KERNEL_VERSION_PRE_2_6_30)
    /* Store the value of the semaphore counter */
Cedric Roux's avatar
Cedric Roux committed
618
619
620
    if (itti_desc.threads[task_id].sem_counter > 0) {
        struct message_list_s *message = NULL;

621
622
        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
            /* No element in list -> this should not happen */
Cedric Roux's avatar
Cedric Roux committed
623
            DevParam(task_id, itti_desc.threads[task_id].sem_counter, 0);
624
625
626
        }
        DevAssert(message != NULL);
        *received_msg = message->msg;
627
        itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
628

Cedric Roux's avatar
Cedric Roux committed
629
        itti_desc.threads[task_id].sem_counter--;
630
631
    } else
#endif
632
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
Cedric Roux's avatar
   
Cedric Roux committed
633

winckel's avatar
winckel committed
634
#if defined(OAI_EMU) || defined(RTAI)
635
636
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
637
#endif
Cedric Roux's avatar
   
Cedric Roux committed
638
639
}

640
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
winckel's avatar
winckel committed
641
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
642
643
644

    *received_msg = NULL;

645
#if defined(OAI_EMU) || defined(RTAI)
646
647
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
648
649
650
651
652
653
654
655
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
            *received_msg = message->msg;
656
            itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
657
658
        }
    }
Cedric Roux's avatar
   
Cedric Roux committed
659

660
661
    if (*received_msg == NULL) {
        ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
   
Cedric Roux committed
662
    }
663
664

#if defined(OAI_EMU) || defined(RTAI)
665
666
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
667
#endif
Cedric Roux's avatar
   
Cedric Roux committed
668
669
}

670
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
   
Cedric Roux committed
671
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
672
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
673

winckel's avatar
winckel committed
674
675
676
677
    AssertFatal (start_routine != NULL, "Start routine is NULL\n");
    AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)\n", thread_id, itti_desc.thread_max);
    AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)\n",
                 task_id, thread_id, itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
678

679
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
   
Cedric Roux committed
680

681
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
682

683
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
winckel's avatar
winckel committed
684
    AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)\n", task_id, thread_id, result);
Cedric Roux's avatar
   
Cedric Roux committed
685

686
687
    itti_desc.created_tasks ++;

Cedric Roux's avatar
   
Cedric Roux committed
688
    /* Wait till the thread is completely ready */
689
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
690
691
        usleep (1000);

Cedric Roux's avatar
   
Cedric Roux committed
692
693
694
    return 0;
}

695
696
697
698
699
700
701
702
703
704
705
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
#endif

706
707
708
709
void itti_wait_ready(int wait_tasks)
{
    itti_desc.wait_tasks = wait_tasks;

710
    ITTI_DEBUG(ITTI_DEBUG_INIT, " wait for tasks: %s, created tasks %d, ready tasks %d\n", itti_desc.wait_tasks ? "yes" : "no",
711
712
        itti_desc.created_tasks, itti_desc.ready_tasks);

winckel's avatar
winckel committed
713
714
    AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d\n",
                 itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
715
716
}

717
718
void itti_mark_task_ready(task_id_t task_id)
{
Cedric Roux's avatar
   
Cedric Roux committed
719
720
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

winckel's avatar
winckel committed
721
    AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)\n", thread_id, itti_desc.thread_max);
722

723
724
725
726
727
728
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

729
730
731
732
733
734
735
736
737
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

738
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
739
740
741
742
743
744
745
    itti_desc.ready_tasks ++;

    while (itti_desc.wait_tasks != 0)
    {
        usleep (10000);
    }

746
    ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
747
748
}

749
void itti_exit_task(void) {
750
751
752
753
754
755
756
757
758
#if defined(OAI_EMU) || defined(RTAI)
    task_id_t task_id = itti_get_current_task_id();

    if (task_id > TASK_UNKNOWN)
    {
        vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                                __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
    }
#endif
759
760
761
    pthread_exit (NULL);
}

762
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
763
    // Sends Terminate signals to all tasks.
764
765
766
767
768
769
770
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
   
Cedric Roux committed
771
772
}

773
774
775
776
777
778
779
780
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
781
782
783
784
785
        usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
#endif
786
787
788
789
790
791
792
793
794
795
796
797

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
Cedric Roux's avatar
Cedric Roux committed
798
                    eventfd_t sem_counter = pending_messages;
799
800
801
802
803
804
805

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
806
807
808
809

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
#endif
810
811
812
813
814
    }
    return NULL;
}
#endif

815
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
816
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
817
818
    task_id_t task_id;
    thread_id_t thread_id;
819
820
    int ret;

821
    itti_desc.message_number = 1;
Cedric Roux's avatar
   
Cedric Roux committed
822

823
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
824

825
    CHECK_INIT_RETURN(signal_mask());
826

Cedric Roux's avatar
   
Cedric Roux committed
827
    /* Saves threads and messages max values */
828
    itti_desc.task_max = task_max;
Cedric Roux's avatar
   
Cedric Roux committed
829
830
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
831
    itti_desc.thread_handling_signals = -1;
832
    itti_desc.tasks_info = tasks_info;
Cedric Roux's avatar
   
Cedric Roux committed
833
834
835
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
836
837
838
839
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
Cedric Roux's avatar
   
Cedric Roux committed
840
841

    /* Initializing each queue and related stuff */
842
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
843
    {
844
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
845
846
847
848
849
850
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

851
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
852
853
854

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
855
        {
winckel's avatar
winckel committed
856
            AssertFatal (0, "lfds611_queue_new failed for task %s\n", itti_get_task_name(task_id));
857
        }
858
859
860
861
862
863
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
864

865
866
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
867
            /* Always assert on this condition */
winckel's avatar
winckel committed
868
            AssertFatal (0, "Failed to create new epoll fd: %s\n", strerror(errno));
869
870
        }

871
872
873
874
875
876
# if defined(KERNEL_VERSION_PRE_2_6_30)
        /* SR: for kernel versions < 2.6.30 EFD_SEMAPHORE is not defined.
         * A read operation on the event fd will return the 8 byte value.
         */
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, 0);
# else
877
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
878
# endif
879
        if (itti_desc.threads[thread_id].task_event_fd == -1)
880
        {
881
            /* Always assert on this condition */
winckel's avatar
winckel committed
882
            AssertFatal (0, " eventfd failed: %s\n", strerror(errno));
883
884
        }

885
        itti_desc.threads[thread_id].nb_events = 1;
886

887
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
888

889
890
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
891
892

        /* Add the event fd to the list of monitored events */
893
894
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
895
896
        {
            /* Always assert on this condition */
winckel's avatar
winckel committed
897
            AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
898
        }
899

900
        ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
901
                   itti_desc.threads[thread_id].task_event_fd, thread_id);
902

903
904
905
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
906
#endif
907
    }
908

909
    itti_desc.running = 1;
910
911
912
    itti_desc.wait_tasks = 0;
    itti_desc.created_tasks = 0;
    itti_desc.ready_tasks = 0;
913
914
915
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
916
917

    rt_global_heap_open();
918
#endif
919

920
921
922
923
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.memory_pools_handle = memory_pools_create (4);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS,       50);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
winckel's avatar
winckel committed
924
    memory_pools_add_pool (itti_desc.memory_pools_handle, 10000,                                1000);
winckel's avatar
winckel committed
925
    memory_pools_add_pool (itti_desc.memory_pools_handle,  500,                                 20000);
926
927
928
929

    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

930
        ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
931
932
933
934
        free (statistics);
    }
#endif

935
936
937
938
939
940
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.vcd_poll_msg = 0;
    itti_desc.vcd_receive_msg = 0;
    itti_desc.vcd_send_msg = 0;
#endif

941
    itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
   
Cedric Roux committed
942