intertask_interface.c 36.5 KB
Newer Older
Cedric Roux's avatar
   
Cedric Roux committed
1
/*******************************************************************************
ghaddab's avatar
ghaddab committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    OpenAirInterface 
    Copyright(c) 1999 - 2014 Eurecom

    OpenAirInterface is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.


    OpenAirInterface is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with OpenAirInterface.The full GNU General Public License is 
   included in this distribution in the file called "COPYING". If not, 
   see <http://www.gnu.org/licenses/>.

  Contact Information
  OpenAirInterface Admin: openair_admin@eurecom.fr
  OpenAirInterface Tech : openair_tech@eurecom.fr
  OpenAirInterface Dev  : openair4g-devel@eurecom.fr
  
ghaddab's avatar
ghaddab committed
26
  Address      : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
Cedric Roux's avatar
   
Cedric Roux committed
27

28
 *******************************************************************************/
Cedric Roux's avatar
   
Cedric Roux committed
29

30
#define _GNU_SOURCE
Cedric Roux's avatar
   
Cedric Roux committed
31
32
33
34
35
36
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
37
#include <signal.h>
Cedric Roux's avatar
   
Cedric Roux committed
38

39
40
#include <sys/epoll.h>
#include <sys/eventfd.h>
41

42
43
44
45
#ifdef RTAI
# include <rtai_shm.h>
#endif

gauthier's avatar
gauthier committed
46
47
48
49
#if !defined(TRUE)
#define TRUE 1
#endif

50
51
52
53
54
55
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

56
#if defined(OAI_EMU) || defined(RTAI)
57
# include "memory_pools.h"
58
59
60
# include "vcd_signal_dumper.h"
#endif

61
62
63
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
   
Cedric Roux committed
64
65
66
67
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

68
#include "signals.h"
Cedric Roux's avatar
   
Cedric Roux committed
69
70
#include "timer.h"

71
72
73
74
75
76
77
78
#ifdef RTAI
# include <rtai.h>
# include <rtai_fifos.h>
#    define FIFO_PRINTF_MAX_STRING_SIZE 1000
#    define FIFO_PRINTF_NO              62
#    define FIFO_PRINTF_SIZE            65536
#endif

79
80
81
82
83
84
85
86
87
88
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS;
Cedric Roux's avatar
   
Cedric Roux committed
89

90
91
/* Don't flush if using RTAI */
#ifdef RTAI
92
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) rt_log_debug (x, ##args); } while(0);
93
#else
94
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } while(0);
95
#endif
96
#define ITTI_ERROR(x, args...) 	    do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } while(0);
Cedric Roux's avatar
   
Cedric Roux committed
97
98
99
100

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

101
102
103
104
105
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

Cedric Roux's avatar
   
Cedric Roux committed
106
typedef enum task_state_s {
107
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
   
Cedric Roux committed
108
109
110
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
111
typedef struct message_list_s {
112
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
   
Cedric Roux committed
113

114
115
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
116
} message_list_t;
Cedric Roux's avatar
   
Cedric Roux committed
117

118
119
120
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
121

122
123
    /* State of the thread */
    volatile task_state_t task_state;
124
125
126
127

    /* This fd is used internally by ITTI. */
    int epoll_fd;

128
    /* The thread fd */
129
130
131
132
133
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

134

135
136
137
138
139
140
    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
141
142

    int epoll_nb_events;
143

knopp's avatar
   
knopp committed
144
//#ifdef RTAI
145
146
147
148
149
    /* Flag to mark real time thread */
    unsigned real_time;

    /* Counter to indicate from RTAI threads that messages are pending for the thread */
    unsigned messages_pending;
knopp's avatar
   
knopp committed
150
//#endif
151
152
153
154
155
} thread_desc_t;

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
    struct lfds611_queue_state *message_queue;
Cedric Roux's avatar
   
Cedric Roux committed
156
157
} task_desc_t;

158
typedef struct itti_desc_s {
159
    thread_desc_t *threads;
160
    task_desc_t   *tasks;
161

Cedric Roux's avatar
   
Cedric Roux committed
162
    /* Current message number. Incremented every call to send_msg_to_task */
163
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
   
Cedric Roux committed
164
165

    thread_id_t thread_max;
166
    task_id_t task_max;
Cedric Roux's avatar
   
Cedric Roux committed
167
168
    MessagesIds messages_id_max;

169
170
    pthread_t thread_handling_signals;

171
    const task_info_t *tasks_info;
Cedric Roux's avatar
   
Cedric Roux committed
172
173
    const message_info_t *messages_info;

174
    itti_lte_time_t lte_time;
175
176

    int running;
177
178
179
180

    volatile uint32_t created_tasks;
    volatile uint32_t ready_tasks;
    volatile int      wait_tasks;
181
182
183
#ifdef RTAI
    pthread_t rt_relay_thread;
#endif
184
185

#if defined(OAI_EMU) || defined(RTAI)
186
187
    memory_pools_handle_t memory_pools_handle;

188
189
190
191
    uint64_t vcd_poll_msg;
    uint64_t vcd_receive_msg;
    uint64_t vcd_send_msg;
#endif
192
193
194
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
   
Cedric Roux committed
195

196
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
197
198
199
{
    void *ptr = NULL;

200
201
202
203
204
205
#if defined(OAI_EMU) || defined(RTAI)
    ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);
    if (ptr == NULL)
    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

206
        ITTI_ERROR (" Memory pools statistics:\n%s", statistics);
207
208
        free (statistics);
    }
winckel's avatar
winckel committed
209
210
#else
    ptr = malloc (size);
211
#endif
winckel's avatar
winckel committed
212

213
    AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)!\n", (int) size, origin_task_id, destination_task_id);
214
215
216
217

    return ptr;
}

218
int itti_free(task_id_t task_id, void *ptr)
219
{
220
221
    int result = EXIT_SUCCESS;
    AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
222
223

#if defined(OAI_EMU) || defined(RTAI)
224
225
226
    result = memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);

    AssertError (result == EXIT_SUCCESS, {}, "Failed to free memory at %p (%d)!\n", ptr, task_id);
227
#else
228
    free (ptr);
229
#endif
230
231

    return (result);
232
233
}

234
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
   
Cedric Roux committed
235
236
237
238
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
239
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
   
Cedric Roux committed
240
241
}

242
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
243
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
244
245
246
247

    return (itti_desc.messages_info[message_id].priority);
}

248
const char *itti_get_message_name(MessagesIds message_id) {
249
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
250
251
252
253

    return (itti_desc.messages_info[message_id].name);
}

254
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
255
{
256
257
258
259
260
261
262
263
    if (itti_desc.task_max > 0)
    {
        AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
    }
    else
    {
        return ("ITTI NOT INITIALIZED !!!");
    }
Cedric Roux's avatar
Cedric Roux committed
264

265
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
266
267
}

268
static task_id_t itti_get_current_task_id(void)
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#ifdef RTAI
static void rt_log_debug(char *format, ...)
{
    task_id_t   task_id;
    va_list     args;
    char        log_buffer[FIFO_PRINTF_MAX_STRING_SIZE];
    int         len;

    task_id = itti_get_current_task_id ();
    len = snprintf(log_buffer, FIFO_PRINTF_MAX_STRING_SIZE-1, "[ITTI][D][%s]", itti_get_task_name(task_id));
    va_start(args, format);
    len += vsnprintf(&log_buffer[len], FIFO_PRINTF_MAX_STRING_SIZE-1-len, format, args);
    va_end (args);

    if (task_id != TASK_UNKNOWN)
        fwrite(log_buffer, len, 1, stdout);
    else
        rtf_put (FIFO_PRINTF_NO, log_buffer, len);
}
#endif

307
308
309
310
311
312
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

313
int itti_send_broadcast_message(MessageDef *message_p) {
314
    task_id_t destination_task_id;
315
    task_id_t origin_task_id;
316
    thread_id_t origin_thread_id;
317
    uint32_t thread_id;
Cedric Roux's avatar
   
Cedric Roux committed
318
    int ret = 0;
319
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
320

321
    AssertFatal (message_p != NULL, "Trying to broadcast a NULL message!\n");
Cedric Roux's avatar
   
Cedric Roux committed
322

323
324
    origin_task_id = message_p->ittiMsgHeader.originTaskId;
    origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
Cedric Roux's avatar
   
Cedric Roux committed
325

326
327
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
328
329
        MessageDef *new_message_p;

330
331
332
333
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
334
        /* Skip task that broadcast the message */
335
        if (thread_id != origin_thread_id) {
336
            /* Skip tasks which are not running */
337
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
338
                new_message_p = itti_malloc (origin_task_id, destination_task_id, sizeof(MessageDef));
339
                AssertFatal (new_message_p != NULL, "New message allocation failed!\n");
340
341

                memcpy (new_message_p, message_p, sizeof(MessageDef));
342
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
343
                AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)!\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
344
            }
Cedric Roux's avatar
   
Cedric Roux committed
345
346
        }
    }
347
348
    result = itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
Cedric Roux's avatar
   
Cedric Roux committed
349
350
351
352

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
353
354
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
Cedric Roux's avatar
   
Cedric Roux committed
355
356
    MessageDef *temp = NULL;

357
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
358

359
360
361
362
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
#endif

363
364
365
366
367
368
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

369
    temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
Cedric Roux's avatar
   
Cedric Roux committed
370

Cedric Roux's avatar
Cedric Roux committed
371
372
373
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
   
Cedric Roux committed
374

375
376
377
378
#if defined(OAI_EMU) || defined(RTAI)
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
#endif

Cedric Roux's avatar
   
Cedric Roux committed
379
380
381
    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
382
383
384
385
386
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

387
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
388
{
389
    thread_id_t destination_thread_id;
390
    thread_id_t origin_task_id;
391
    message_list_t *new;
392
393
394
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
   
Cedric Roux committed
395

winckel's avatar
winckel committed
396
#if defined(OAI_EMU) || defined(RTAI)
397
398
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
399
400
#endif

401
    AssertFatal (message != NULL, "Message is NULL!\n");
winckel's avatar
winckel committed
402
    AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
403

404
405
    destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
Cedric Roux's avatar
Cedric Roux committed
406
407
408
409
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
410
    AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
411

412
413
    origin_task_id = ITTI_MSG_ORIGIN_ID(message);

414
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
   
Cedric Roux committed
415

416
417
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
   
Cedric Roux committed
418

419
    itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
420
421
422
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);

    if (destination_task_id != TASK_UNKNOWN)
423
    {
424
#if defined(OAI_EMU) || defined(RTAI)
winckel's avatar
winckel committed
425
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
winckel's avatar
winckel committed
426
427

        memory_pools_set_info (itti_desc.memory_pools_handle, message, 1, destination_task_id);
winckel's avatar
winckel committed
428
429
#endif

430
431
        if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
        {
432
            ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
433
434
435
436
437
438
439
440
441
442
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
        else
        {
            /* We cannot send a message if the task is not running */
gauthier's avatar
gauthier committed
443
444
445
446
447
448
449
            AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                    "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                    itti_get_task_name(origin_task_id),
                    itti_desc.messages_info[message_id].name,
                    message_id,
                    destination_thread_id,
                    itti_desc.threads[destination_thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
450

451
            /* Allocate new list element */
452
            new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));
Cedric Roux's avatar
   
Cedric Roux committed
453

454
455
456
457
            /* Fill in members */
            new->msg = message;
            new->message_number = message_number;
            new->message_priority = priority;
Cedric Roux's avatar
   
Cedric Roux committed
458

459
460
            /* Enqueue message in destination task queue */
            lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
461

462
#if defined(OAI_EMU) || defined(RTAI)
463
            vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
464
465
#endif

466
#ifdef RTAI
467
468
469
470
471
472
            if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
            {
                /* This is a RT task, increase destination task messages pending counter */
                __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
            }
            else
473
474
#endif
            {
475
476
477
478
                /* Only use event fd for tasks, subtasks will pool the queue */
                if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
                {
                    ssize_t write_ret;
Cedric Roux's avatar
Cedric Roux committed
479
                    eventfd_t sem_counter = 1;
480

481
482
                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
winckel's avatar
winckel committed
483
484
                    AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                                 destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
485
                }
486
            }
Cedric Roux's avatar
   
Cedric Roux committed
487

488
            ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
489
490
491
492
493
494
495
                       itti_desc.messages_info[message_id].name,
                       message_number,
                       priority,
                       itti_get_task_name(origin_task_id),
                       destination_task_id,
                       itti_get_task_name(destination_task_id));
        }
Cedric Roux's avatar
Cedric Roux committed
496
497
    } else {
        /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
498
499
        int result = itti_free(origin_task_id, message);
        AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
500
    }
501

502
#if defined(OAI_EMU) || defined(RTAI)
503
504
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
505
506
#endif

Cedric Roux's avatar
   
Cedric Roux committed
507
508
509
    return 0;
}

510
511
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
512
    thread_id_t thread_id;
513
514
    struct epoll_event event;

515
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
516

517
518
    thread_id = TASK_GET_THREAD_ID(task_id);
    itti_desc.threads[thread_id].nb_events++;
519
520

    /* Reallocate the events */
521
522
523
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
524

525
    event.events  = EPOLLIN | EPOLLERR;
Cedric Roux's avatar
Cedric Roux committed
526
527
    event.data.u64 = 0;
    event.data.fd  = fd;
528
529

    /* Add the event fd to the list of monitored events */
530
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
531
532
533
        &event) != 0)
    {
        /* Always assert on this condition */
534
        AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
winckel's avatar
winckel committed
535
                     itti_get_task_name(task_id), fd, strerror(errno));
536
    }
537

538
    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
539
540
541
542
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
543
544
    thread_id_t thread_id;

545
546
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
    AssertFatal (fd >= 0, "File descriptor (%d) is invalid!\n", fd);
547

548
    thread_id = TASK_GET_THREAD_ID(task_id);
549
    /* Add the event fd to the list of monitored events */
550
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
551
552
    {
        /* Always assert on this condition */
553
        AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
winckel's avatar
winckel committed
554
                     itti_get_task_name(task_id), fd, strerror(errno));
555
556
    }

557
558
559
560
    itti_desc.threads[thread_id].nb_events--;
    itti_desc.threads[thread_id].events = realloc(
        itti_desc.threads[thread_id].events,
        itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
561
562
563
564
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
565
566
    thread_id_t thread_id;

winckel's avatar
winckel committed
567
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
568

569
570
    thread_id = TASK_GET_THREAD_ID(task_id);
    *events = itti_desc.threads[thread_id].events;
571

572
    return itti_desc.threads[thread_id].epoll_nb_events;
573
574
}

575
576
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
577
    thread_id_t thread_id;
578
579
    int epoll_ret = 0;
    int epoll_timeout = 0;
580
    int i;
581

582
583
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
    AssertFatal (received_msg != NULL, "Received message is NULL!\n");
584

585
    thread_id = TASK_GET_THREAD_ID(task_id);
586
587
588
589
590
591
592
593
    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
594
        /* timeout = -1 causes the epoll_wait to wait indefinitely.
595
596
597
598
         */
        epoll_timeout = -1;
    }

599
    do {
600
601
602
        epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                               itti_desc.threads[thread_id].events,
                               itti_desc.threads[thread_id].nb_events,
603
604
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
605
606

    if (epoll_ret < 0) {
607
        AssertFatal (0, "epoll_wait failed for task %s: %s!\n", itti_get_task_name(task_id), strerror(errno));
608
609
610
611
612
613
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

614
    itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
615

616
    for (i = 0; i < epoll_ret; i++) {
617
        /* Check if there is an event for ITTI for the event fd */
618
619
        if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
            (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
620
        {
621
            struct message_list_s *message = NULL;
622
623
624
            eventfd_t   sem_counter;
            ssize_t     read_ret;
            int         result;
625
626

            /* Read will always return 1 */
627
            read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
628
            AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)!\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));
629

630

631
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
632
                /* No element in list -> this should not happen */
633
                AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue!\n", task_id, epoll_ret);
634
            }
635
            AssertFatal(message != NULL, "Message from message queue is NULL!\n");
636
            *received_msg = message->msg;
637
638
639
            result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
            AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

640
641
            /* Mark that the event has been processed */
            itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
642
            return;
643
644
645
646
647
648
        }
    }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
649
#if defined(OAI_EMU) || defined(RTAI)
650
651
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
652
#endif
653

654
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
Cedric Roux's avatar
   
Cedric Roux committed
655

winckel's avatar
winckel committed
656
#if defined(OAI_EMU) || defined(RTAI)
657
658
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
659
#endif
Cedric Roux's avatar
   
Cedric Roux committed
660
661
}

662
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
663
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
664
665
666

    *received_msg = NULL;

667
#if defined(OAI_EMU) || defined(RTAI)
668
669
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
670
671
672
673
674
675
676
#endif

    {
        struct message_list_s *message;

        if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
        {
677
678
            int result;

679
            *received_msg = message->msg;
680
681
            result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
            AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
682
683
        }
    }
Cedric Roux's avatar
   
Cedric Roux committed
684

685
686
    if (*received_msg == NULL) {
        ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
   
Cedric Roux committed
687
    }
688
689

#if defined(OAI_EMU) || defined(RTAI)
690
691
    vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
692
#endif
Cedric Roux's avatar
   
Cedric Roux committed
693
694
}

695
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
   
Cedric Roux committed
696
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
697
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
698

699
700
701
    AssertFatal (start_routine != NULL, "Start routine is NULL!\n");
    AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
    AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)!\n",
winckel's avatar
winckel committed
702
                 task_id, thread_id, itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
703

704
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
   
Cedric Roux committed
705

706
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
707

708
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
709
    AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)!\n", task_id, thread_id, result);
Cedric Roux's avatar
   
Cedric Roux committed
710

711
712
    itti_desc.created_tasks ++;

Cedric Roux's avatar
   
Cedric Roux committed
713
    /* Wait till the thread is completely ready */
714
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
715
716
        usleep (1000);

Cedric Roux's avatar
   
Cedric Roux committed
717
718
719
    return 0;
}

knopp's avatar
   
knopp committed
720
//#ifdef RTAI 
721
722
723
724
725
726
727
728
void itti_set_task_real_time(task_id_t task_id)
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    itti_desc.threads[thread_id].real_time = TRUE;
}
knopp's avatar
   
knopp committed
729
//#endif
730

731
732
733
734
void itti_wait_ready(int wait_tasks)
{
    itti_desc.wait_tasks = wait_tasks;

gauthier's avatar
gauthier committed
735
736
737
738
739
    ITTI_DEBUG(ITTI_DEBUG_INIT,
            " wait for tasks: %s, created tasks %d, ready tasks %d\n",
            itti_desc.wait_tasks ? "yes" : "no",
            itti_desc.created_tasks,
            itti_desc.ready_tasks);
740

741
    AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d!\n",
winckel's avatar
winckel committed
742
                 itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
743
744
}

745
746
void itti_mark_task_ready(task_id_t task_id)
{
Cedric Roux's avatar
   
Cedric Roux committed
747
748
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

749
    AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
750

751
752
753
754
755
756
    /* Register the thread in itti dump */
    itti_dump_thread_use_ring_buffer();

    /* Mark the thread as using LFDS queue */
    lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

757
758
759
760
761
762
763
764
765
#ifdef RTAI
    /* Assign low priority to created threads */
    {
        struct sched_param sched_param;
        sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
        sched_setscheduler(0, SCHED_FIFO, &sched_param);
    }
#endif

766
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
767
768
769
770
771
772
773
    itti_desc.ready_tasks ++;

    while (itti_desc.wait_tasks != 0)
    {
        usleep (10000);
    }

774
    ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
775
776
}

777
void itti_exit_task(void) {
778
779
780
781
782
783
784
785
786
#if defined(OAI_EMU) || defined(RTAI)
    task_id_t task_id = itti_get_current_task_id();

    if (task_id > TASK_UNKNOWN)
    {
        vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                                __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
    }
#endif
787
788
789
    pthread_exit (NULL);
}

790
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
791
    // Sends Terminate signals to all tasks.
792
793
794
795
796
797
798
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
   
Cedric Roux committed
799
800
}

801
802
803
804
805
806
807
808
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
    thread_id_t thread_id;
    unsigned pending_messages;

    while (itti_desc.running)
    {
809
810
811
812
813
        usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
#endif
814
815
816
817
818
819
820
821
822
823
824
825

        /* Checks for all non real time tasks if they have pending messages */
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
        {
            if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
                    && (itti_desc.threads[thread_id].real_time == FALSE))
            {
                pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

                if (pending_messages > 0)
                {
                    ssize_t write_ret;
Cedric Roux's avatar
Cedric Roux committed
826
                    eventfd_t sem_counter = pending_messages;
827
828
829
830
831
832
833

                    /* Call to write for an event fd must be of 8 bytes */
                    write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
                    DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
                }
            }
        }
834
835
836
837

#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
#endif
838
839
840
841
842
    }
    return NULL;
}
#endif

843
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
844
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
845
846
    task_id_t task_id;
    thread_id_t thread_id;
847
848
    int ret;

849
    itti_desc.message_number = 1;
Cedric Roux's avatar
   
Cedric Roux committed
850

851
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
852

853
    CHECK_INIT_RETURN(signal_mask());
854

Cedric Roux's avatar
   
Cedric Roux committed
855
    /* Saves threads and messages max values */
856
    itti_desc.task_max = task_max;
Cedric Roux's avatar
   
Cedric Roux committed
857
858
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
859
    itti_desc.thread_handling_signals = -1;
860
    itti_desc.tasks_info = tasks_info;
Cedric Roux's avatar
   
Cedric Roux committed
861
862
863
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
864
865
866
867
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
Cedric Roux's avatar
   
Cedric Roux committed
868
869

    /* Initializing each queue and related stuff */
870
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
871
    {
872
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
873
874
875
876
877
878
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
                   itti_desc.tasks_info[task_id].name,
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
                   itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
                   itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

879
        ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
880
881
882

        ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
        if (ret < 0)
883
        {
884
            AssertFatal (0, "lfds611_queue_new failed for task %s!\n", itti_get_task_name(task_id));
885
        }
886
887
888
889
890
891
    }

    /* Initializing each thread */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
    {
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
892

893
894
        itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
        if (itti_desc.threads[thread_id].epoll_fd == -1) {
895
            /* Always assert on this condition */
896
            AssertFatal (0, "Failed to create new epoll fd: %s!\n", strerror(errno));
897
898
        }

899
900
        itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
        if (itti_desc.threads[thread_id].task_event_fd == -1)
901
        {
902
            /* Always assert on this condition */
903
            AssertFatal (0, " eventfd failed: %s!\n", strerror(errno));
904
905
        }

906
        itti_desc.threads[thread_id].nb_events = 1;
907

908
        itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
909

910
911
        itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
        itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
912
913

        /* Add the event fd to the list of monitored events */
914
915
        if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
916
917
        {
            /* Always assert on this condition */
918
            AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s!\n", strerror(errno));
919
        }
920

921
        ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
922
                   itti_desc.threads[thread_id].task_event_fd, thread_id);
923

924
925
926
#ifdef RTAI
        itti_desc.threads[thread_id].real_time = FALSE;
        itti_desc.threads[thread_id].messages_pending = 0;
927
#endif
928
    }
929

930
    itti_desc.running = 1;
931
932
933
    itti_desc.wait_tasks = 0;
    itti_desc.created_tasks = 0;
    itti_desc.ready_tasks = 0;
934
935
936
#ifdef RTAI
    /* Start RT relay thread */
    DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
937
938

    rt_global_heap_open();
939
#endif
940

941
#if defined(OAI_EMU) || defined(RTAI)
942
    itti_desc.memory_pools_handle = memory_pools_create (5);
943
944
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS,       50);
    memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
winckel's avatar
winckel committed
945
    memory_pools_add_pool (itti_desc.memory_pools_handle, 10000,                                1000);
946
947
    memory_pools_add_pool (itti_desc.memory_pools_handle,  400,                                 20050);
    memory_pools_add_pool (itti_desc.memory_pools_handle,  100,                                 30050);
948
949
950
951

    {
        char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

952
        ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
953
954
955
956
        free (statistics);
    }
#endif

957
958
959
960
961
962
#if defined(OAI_EMU) || defined(RTAI)
    itti_desc.vcd_poll_msg = 0;
    itti_desc.vcd_receive_msg = 0;
    itti_desc.vcd_send_msg = 0;
#endif