intertask_interface.c 26.4 KB
Newer Older
Cedric Roux's avatar
 
Cedric Roux committed
1
2
/*******************************************************************************

3
4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
Cedric Roux's avatar
 
Cedric Roux committed
5

6
7
8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
Cedric Roux's avatar
 
Cedric Roux committed
9

10
11
12
13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
Cedric Roux's avatar
 
Cedric Roux committed
14

15
16
17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Cedric Roux's avatar
 
Cedric Roux committed
18

19
20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
Cedric Roux's avatar
 
Cedric Roux committed
21

22
23
24
25
26
27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
Cedric Roux's avatar
 
Cedric Roux committed
28

29
 *******************************************************************************/
Cedric Roux's avatar
 
Cedric Roux committed
30

31
#define _GNU_SOURCE
Cedric Roux's avatar
 
Cedric Roux committed
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
Cedric Roux's avatar
 
Cedric Roux committed
39
40
41
42

#include "queue.h"
#include "assertions.h"

43
44
45
46
47
48
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
# include <sys/eventfd.h>
# include "liblfds611.h"
#endif

Cedric Roux's avatar
 
Cedric Roux committed
49
50
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
51
52
53
54

/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
 
Cedric Roux committed
55
56
57
58
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

59
#include "signals.h"
Cedric Roux's avatar
 
Cedric Roux committed
60
61
#include "timer.h"

62
63
const int itti_debug = 0;
const int itti_debug_poll = 0;
Cedric Roux's avatar
 
Cedric Roux committed
64

65
#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
66
    while(0)
67
#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
68
69
70
71
72
73
    while(0)

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
74
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
 
Cedric Roux committed
75
76
77
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
78
typedef struct message_list_s {
79
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
80
    STAILQ_ENTRY(message_list_s) next_element;
81
#endif
Cedric Roux's avatar
 
Cedric Roux committed
82

83
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
 
Cedric Roux committed
84

85
86
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
87
} message_list_t;
Cedric Roux's avatar
 
Cedric Roux committed
88

89
90
91
92
93
94
95
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
    /* State of the thread */
    volatile task_state_t task_state;
} thread_desc_t;

Cedric Roux's avatar
 
Cedric Roux committed
96
97
typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
98
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
Cedric Roux committed
99
    STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
Cedric Roux's avatar
 
Cedric Roux committed
100
101

    /* Number of messages in the queue */
102
    volatile uint32_t message_in_queue;
Cedric Roux's avatar
 
Cedric Roux committed
103
    /* Mutex for the message queue */
104
    pthread_mutex_t message_queue_mutex;
Cedric Roux's avatar
 
Cedric Roux committed
105
    /* Conditional var for message queue and task synchro */
106
    pthread_cond_t message_queue_cond_var;
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#else
    struct lfds611_queue_state *message_queue;

    /* This fd is used internally by ITTI. */
    int epoll_fd;

    /* The task fd */
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
125
126

    int epoll_nb_events;
127
#endif
Cedric Roux's avatar
 
Cedric Roux committed
128
129
} task_desc_t;

130
typedef struct itti_desc_s {
131
    thread_desc_t *threads;
Cedric Roux's avatar
 
Cedric Roux committed
132
    task_desc_t *tasks;
133

Cedric Roux's avatar
 
Cedric Roux committed
134
    /* Current message number. Incremented every call to send_msg_to_task */
135
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
 
Cedric Roux committed
136
137

    thread_id_t thread_max;
138
    task_id_t task_max;
Cedric Roux's avatar
 
Cedric Roux committed
139
140
    MessagesIds messages_id_max;

141
142
    pthread_t thread_handling_signals;

143
    const task_info_t *tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
144
145
    const message_info_t *messages_info;

146
147
148
149
    itti_lte_time_t lte_time;
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
 
Cedric Roux committed
150

151
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
 
Cedric Roux committed
152
153
154
155
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
156
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
 
Cedric Roux committed
157
158
}

159
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
160
161
162
163
164
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

165
const char *itti_get_message_name(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
166
167
168
169
170
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

171
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
172
{
173
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
174

175
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
176
177
}

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
static task_id_t itti_get_current_task_id()
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

196
197
198
199
200
201
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

202
int itti_send_broadcast_message(MessageDef *message_p) {
203
    task_id_t destination_task_id;
204
    thread_id_t origin_thread_id;
205
    uint32_t thread_id;
Cedric Roux's avatar
 
Cedric Roux committed
206
    int ret = 0;
207
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
208

209
    DevAssert(message_p != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
210

Cedric Roux's avatar
Cedric Roux committed
211
    origin_thread_id = TASK_GET_THREAD_ID(message_p->ittiMsgHeader.originTaskId);
Cedric Roux's avatar
 
Cedric Roux committed
212

213
214
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
215
216
        MessageDef *new_message_p;

217
218
219
220
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
221
        /* Skip task that broadcast the message */
222
        if (thread_id != origin_thread_id) {
223
            /* Skip tasks which are not running */
224
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
225
                new_message_p = malloc (sizeof(MessageDef));
226
                DevAssert(message_p != NULL);
227
228

                memcpy (new_message_p, message_p, sizeof(MessageDef));
229
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
230
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
231
            }
Cedric Roux's avatar
 
Cedric Roux committed
232
233
        }
    }
234
    free (message_p);
Cedric Roux's avatar
 
Cedric Roux committed
235
236
237
238

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
239
240
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
Cedric Roux's avatar
 
Cedric Roux committed
241
242
    MessageDef *temp = NULL;

243
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
244

245
246
247
248
249
250
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

Cedric Roux's avatar
Cedric Roux committed
251
    temp = calloc (1, sizeof(MessageHeader) + size);
252
    DevAssert(temp != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
253

Cedric Roux's avatar
Cedric Roux committed
254
255
256
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
 
Cedric Roux committed
257
258
259
260

    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
261
262
263
264
265
266
267
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message)
{
268
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
269
    message_list_t *new;
270
271
272
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
 
Cedric Roux committed
273

274
    DevAssert(message != NULL);
275
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
276

Cedric Roux's avatar
Cedric Roux committed
277
278
279
280
281
    message->ittiMsgHeader.destinationTaskId = task_id;
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
Cedric Roux's avatar
 
Cedric Roux committed
282
283
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

284
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
 
Cedric Roux committed
285

286
287
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
 
Cedric Roux committed
288

289
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
Cedric Roux's avatar
Cedric Roux committed
290
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
291

292
293
294
295
296
    if (task_id != TASK_UNKNOWN)
    {
        /* We cannot send a message if the task is not running */
        DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state,
                 TASK_STATE_READY, thread_id);
Cedric Roux's avatar
 
Cedric Roux committed
297

298
299
300
#if !defined(ENABLE_EVENT_FD)
        /* Lock the mutex to get exclusive access to the list */
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
301

302
303
304
305
        /* Check the number of messages in the queue */
        DevCheck(itti_desc.tasks[task_id].message_in_queue < itti_desc.tasks_info[task_id].queue_size,
                 task_id, itti_desc.tasks[task_id].message_in_queue, itti_desc.tasks_info[task_id].queue_size);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
306

307
        /* Allocate new list element */
308
        new = (message_list_t *) malloc (sizeof(struct message_list_s));
309
        DevAssert(new != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
310

311
312
313
314
        /* Fill in members */
        new->msg = message;
        new->message_number = message_number;
        new->message_priority = priority;
Cedric Roux's avatar
 
Cedric Roux committed
315

316
#if defined(ENABLE_EVENT_FD)
317
        {
318
            ssize_t  write_ret;
319
            uint64_t sem_counter = 1;
320

321
            lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
322

323
            /* Call to write for an event fd must be of 8 bytes */
324
325
            write_ret = write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
            DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0);
326
        }
327
#else
328
329
330
331
        if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
            STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element);
        }
        else {
Cedric Roux's avatar
 
Cedric Roux committed
332
333
334
335
//         struct message_list_s *insert_after = NULL;
//         struct message_list_s *temp;
// 
//         /* This method is inefficient... */
336
//         STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) {
Cedric Roux's avatar
 
Cedric Roux committed
337
338
339
340
341
342
343
344
345
346
347
348
349
//             struct message_list_s *next;
//             next = STAILQ_NEXT(temp, next_element);
//             /* Increment message priority to create a sort of
//              * priority based scheduler */
// //             if (temp->message_priority < TASK_PRIORITY_MAX) {
// //                 temp->message_priority++;
// //             }
//             if (next && next->message_priority < priority) {
//                 insert_after = temp;
//                 break;
//             }
//         }
//         if (insert_after == NULL) {
350
        STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element);
Cedric Roux's avatar
 
Cedric Roux committed
351
//         } else {
352
//             STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new,
Cedric Roux's avatar
 
Cedric Roux committed
353
354
//                                 next_element);
//         }
355
        }
Cedric Roux's avatar
 
Cedric Roux committed
356

357
358
359
360
361
362
363
364
        /* Update the number of messages in the queue */
        itti_desc.tasks[task_id].message_in_queue++;
        if (itti_desc.tasks[task_id].message_in_queue == 1) {
            /* Emit a signal to wake up target task thread */
            pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var);
        }
        /* Release the mutex */
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
365
#endif
366
    }
367

368
369
    ITTI_DEBUG(
            "Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
370
            itti_desc.messages_info[message_id].name, message_number, priority, task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
371
372
373
    return 0;
}

374
#if defined(ENABLE_EVENT_FD)
375
376
377
378
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
    struct epoll_event event;

379
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
380
381
    DevCheck(fd >= 0, fd, 0, 0);

382
    itti_desc.tasks[task_id].nb_events++;
383
384

    /* Reallocate the events */
385
386
387
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
388

389
    event.events  = EPOLLIN | EPOLLERR;
390
391
392
    event.data.fd = fd;

    /* Add the event fd to the list of monitored events */
393
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd,
394
395
396
397
398
399
400
        &event) != 0)
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
401
402

    ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
403
404
405
406
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
407
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
408
409
410
    DevCheck(fd >= 0, fd, 0, 0);

    /* Add the event fd to the list of monitored events */
411
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
412
413
414
415
416
417
418
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

419
420
421
422
    itti_desc.tasks[task_id].nb_events--;
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
423
424
425
426
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
427
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
428

429
    *events = itti_desc.tasks[task_id].events;
430

431
    return itti_desc.tasks[task_id].epoll_nb_events;
432
433
}

434
435
436
437
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
    int epoll_ret = 0;
    int epoll_timeout = 0;
438
    int i;
439

440
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
    DevAssert(received_msg != NULL);

    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
        /* timeout = -1 causes the epoll_wait to wait indefinetely.
         */
        epoll_timeout = -1;
    }

456
    do {
457
458
459
        epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd,
                               itti_desc.tasks[task_id].events,
                               itti_desc.tasks[task_id].nb_events,
460
461
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
462
463
464

    if (epoll_ret < 0) {
        ITTI_ERROR("epoll_wait failed for task %s: %s\n",
465
                   itti_get_task_name(task_id), strerror(errno));
466
467
468
469
470
471
472
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

473
    itti_desc.tasks[task_id].epoll_nb_events = epoll_ret;
474

475
    for (i = 0; i < epoll_ret; i++) {
476
        /* Check if there is an event for ITTI for the event fd */
477
478
        if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) &&
            (itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd))
479
        {
480
481
            struct message_list_s *message;
            uint64_t sem_counter;
482
            ssize_t  read_ret;
483
484

            /* Read will always return 1 */
485
486
            read_ret = read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
            DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
487

488
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
489
                /* No element in list -> this should not happen */
490
                DevParam(task_id, epoll_ret, 0);
491
492
            }
            *received_msg = message->msg;
493
            free (message);
494
            return;
495
496
497
498
499
500
501
502
503
504
        }
    }
}
#endif

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
#if defined(ENABLE_EVENT_FD)
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
#else
505
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
506
507
508
    DevAssert(received_msg != NULL);

    // Lock the mutex to get exclusive access to the list
509
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
510

511
512
    if (itti_desc.tasks[task_id].message_in_queue == 0) {
        ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
513
        // Wait while list == 0
514
515
516
517
        pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var,
                           &itti_desc.tasks[task_id].message_queue_mutex);
        ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n",
                   task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
518
519
    }

520
    if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
521
        message_list_t *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue);
Cedric Roux's avatar
 
Cedric Roux committed
522
523
524
525
526

        /* Update received_msg reference */
        *received_msg = temp->msg;

        /* Remove message from queue */
527
        STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element);
528
        free (temp);
529
        itti_desc.tasks[task_id].message_in_queue--;
Cedric Roux's avatar
 
Cedric Roux committed
530
531
    }
    // Release the mutex
532
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
533
#endif
Cedric Roux's avatar
 
Cedric Roux committed
534
535
}

536
537
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
538
    DevAssert(received_msg != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
539
540
541

    *received_msg = NULL;

542
543
544
#if defined(ENABLE_EVENT_FD)
    itti_receive_msg_internal_event_fd(task_id, 1, received_msg);
#else
545
    if (itti_desc.tasks[task_id].message_in_queue != 0) {
546
        message_list_t *temp;
Cedric Roux's avatar
 
Cedric Roux committed
547
548

        // Lock the mutex to get exclusive access to the list
549
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
550

551
        STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element)
Cedric Roux's avatar
 
Cedric Roux committed
552
        {
553
554
555
556
557
558
559
560
561
            /* Update received_msg reference */
            *received_msg = temp->msg;

            /* Remove message from queue */
            STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element);
            free (temp);
            itti_desc.tasks[task_id].message_in_queue--;

            ITTI_DEBUG(
Cedric Roux's avatar
Cedric Roux committed
562
563
                       "Receiver queue[(%u:%s)] got new message %s, number %lu\n",
                       task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->ittiMsgHeader.messageId].name, temp->message_number);
564
            break;
Cedric Roux's avatar
 
Cedric Roux committed
565
566
567
        }

        // Release the mutex
568
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
569
    }
570
#endif
Cedric Roux's avatar
 
Cedric Roux committed
571

572
    if ((itti_debug_poll) && (*received_msg == NULL)) {
573
        ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
574
575
576
    }
}

577
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
 
Cedric Roux committed
578
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
579
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
580
581

    DevAssert(start_routine != NULL);
582
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
583
584
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
 
Cedric Roux committed
585

586
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
 
Cedric Roux committed
587

588
589
    ITTI_DEBUG("Create thread for task %s\n", itti_get_task_name(task_id));

590
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
591
    DevCheck(result >= 0, task_id, thread_id, result);
Cedric Roux's avatar
 
Cedric Roux committed
592
593

    /* Wait till the thread is completely ready */
594
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
595
        ;
Cedric Roux's avatar
 
Cedric Roux committed
596
597
598
    return 0;
}

599
void itti_mark_task_ready(task_id_t task_id) {
Cedric Roux's avatar
 
Cedric Roux committed
600
601
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

602
603
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

604
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
605
    // Lock the mutex to get exclusive access to the list
606
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
607
608
#endif

609
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
610

611
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
612
    // Release the mutex
613
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
614
#endif
615
616
}

617
618
619
620
void itti_exit_task(void) {
    pthread_exit (NULL);
}

621
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
622
    // Sends Terminate signals to all tasks.
623
624
625
626
627
628
629
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
 
Cedric Roux committed
630
631
}

632
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
633
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
634
635
    task_id_t task_id;
    thread_id_t thread_id;
636
    itti_desc.message_number = 1;
Cedric Roux's avatar
 
Cedric Roux committed
637

638
    ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
639

640
#if !defined(RTAI)
641
    CHECK_INIT_RETURN(signal_init());
642
#endif
643

Cedric Roux's avatar
 
Cedric Roux committed
644
    /* Saves threads and messages max values */
645
    itti_desc.task_max = task_max;
Cedric Roux's avatar
 
Cedric Roux committed
646
647
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
648
    itti_desc.thread_handling_signals = -1;
649
    itti_desc.tasks_info = tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
650
651
652
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
653
654
655
656
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
Cedric Roux's avatar
 
Cedric Roux committed
657
658

    /* Initializing each queue and related stuff */
659
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
660
661
    {
#if defined(ENABLE_EVENT_FD)
662
663
        ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
        if (lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size) < 0)
664
        {
665
            ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id);
666
667
668
            DevAssert(0 == 1);
        }

669
670
        itti_desc.tasks[task_id].epoll_fd = epoll_create1(0);
        if (itti_desc.tasks[task_id].epoll_fd == -1) {
671
672
673
674
675
            ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

676
        itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
677
678
        if (itti_desc.tasks[task_id].task_event_fd == -1)
        {
679
680
681
682
683
            ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

684
        itti_desc.tasks[task_id].nb_events = 1;
685

686
        itti_desc.tasks[task_id].events = malloc(sizeof(struct epoll_event));
687

688
        itti_desc.tasks[task_id].events->events  = EPOLLIN | EPOLLERR;
689
        itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd;
690
691

        /* Add the event fd to the list of monitored events */
692
693
        if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0)
694
        {
695
            ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
696
697
698
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
699
700
701

        ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
                   itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id));
702
#else
703
704
        STAILQ_INIT (&itti_desc.tasks[task_id].message_queue);
        itti_desc.tasks[task_id].message_in_queue = 0;
705

Cedric Roux's avatar
 
Cedric Roux committed
706
        // Initialize mutexes
707
        pthread_mutex_init (&itti_desc.tasks[task_id].message_queue_mutex, NULL);
708

Cedric Roux's avatar
 
Cedric Roux committed
709
        // Initialize Cond vars
710
        pthread_cond_init (&itti_desc.tasks[task_id].message_queue_cond_var, NULL);
711
#endif
712
    }
713

714
    /* Initializing each thread */
715
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
716
    {
717
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
Cedric Roux's avatar
 
Cedric Roux committed
718
    }
719

720
    itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
 
Cedric Roux committed
721

722
    CHECK_INIT_RETURN(timer_init ());
Cedric Roux's avatar
 
Cedric Roux committed
723
724
725
726

    return 0;
}

727
728
void itti_wait_tasks_end(void) {
    int end = 0;
729
730
    int thread_id;
    task_id_t task_id;
731
732
733
    int ready_tasks;
    int result;
    int retries = 10;
734
735
736
737
738
739
740
741

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

742
743
744
    do {
        ready_tasks = 0;

745
746
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
747
            /* Skip tasks which are not running */
748
749
750
751
752
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
753

754
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
755

756
                ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result);
757
758
759

                if (result == 0) {
                    /* Thread has terminated */
760
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
761
762
763
764
765
766
767
768
769
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
770
        }
771
772
773
774
775
    } while ((ready_tasks > 0) && (retries--));

    if (ready_tasks > 0) {
        ITTI_DEBUG("Some threads are still running, force exit\n");
        exit (0);
776
    }
777
778

    itti_dump_exit();
779
780
781
}

void itti_send_terminate_message(task_id_t task_id) {
Cedric Roux's avatar
 
Cedric Roux committed
782
783
    MessageDef *terminate_message_p;

784
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
Cedric Roux's avatar
 
Cedric Roux committed
785

786
    itti_send_broadcast_message (terminate_message_p);
Cedric Roux's avatar
 
Cedric Roux committed
787
}