intertask_interface.c 26.1 KB
Newer Older
Cedric Roux's avatar
 
Cedric Roux committed
1
2
/*******************************************************************************

3
4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
Cedric Roux's avatar
 
Cedric Roux committed
5

6
7
8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
Cedric Roux's avatar
 
Cedric Roux committed
9

10
11
12
13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
Cedric Roux's avatar
 
Cedric Roux committed
14

15
16
17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Cedric Roux's avatar
 
Cedric Roux committed
18

19
20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
Cedric Roux's avatar
 
Cedric Roux committed
21

22
23
24
25
26
27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
Cedric Roux's avatar
 
Cedric Roux committed
28

29
 *******************************************************************************/
Cedric Roux's avatar
 
Cedric Roux committed
30

31
#define _GNU_SOURCE
Cedric Roux's avatar
 
Cedric Roux committed
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
Cedric Roux's avatar
 
Cedric Roux committed
39
40
41
42

#include "queue.h"
#include "assertions.h"

43
44
45
46
47
48
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
# include <sys/eventfd.h>
# include "liblfds611.h"
#endif

Cedric Roux's avatar
 
Cedric Roux committed
49
50
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
51
52
53
54

/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
 
Cedric Roux committed
55
56
57
58
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

59
#include "signals.h"
Cedric Roux's avatar
 
Cedric Roux committed
60
61
#include "timer.h"

62
63
const int itti_debug = 0;
const int itti_debug_poll = 0;
Cedric Roux's avatar
 
Cedric Roux committed
64

65
#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
66
    while(0)
67
#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } \
Cedric Roux's avatar
 
Cedric Roux committed
68
69
70
71
72
73
    while(0)

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
74
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
 
Cedric Roux committed
75
76
77
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
78
typedef struct message_list_s {
79
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
80
    STAILQ_ENTRY(message_list_s) next_element;
81
#endif
Cedric Roux's avatar
 
Cedric Roux committed
82

83
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
 
Cedric Roux committed
84

85
86
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
87
} message_list_t;
Cedric Roux's avatar
 
Cedric Roux committed
88

89
90
91
92
93
94
95
typedef struct thread_desc_s {
    /* pthread associated with the thread */
    pthread_t task_thread;
    /* State of the thread */
    volatile task_state_t task_state;
} thread_desc_t;

Cedric Roux's avatar
 
Cedric Roux committed
96
97
typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
98
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
Cedric Roux committed
99
    STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
Cedric Roux's avatar
 
Cedric Roux committed
100
101

    /* Number of messages in the queue */
102
    volatile uint32_t message_in_queue;
Cedric Roux's avatar
 
Cedric Roux committed
103
    /* Mutex for the message queue */
104
    pthread_mutex_t message_queue_mutex;
Cedric Roux's avatar
 
Cedric Roux committed
105
    /* Conditional var for message queue and task synchro */
106
    pthread_cond_t message_queue_cond_var;
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#else
    struct lfds611_queue_state *message_queue;

    /* This fd is used internally by ITTI. */
    int epoll_fd;

    /* The task fd */
    int task_event_fd;

    /* Number of events to monitor */
    uint16_t nb_events;

    /* Array of events monitored by the task.
     * By default only one fd is monitored (the one used to received messages
     * from other tasks).
     * More events can be suscribed later by the task itself.
     */
    struct epoll_event *events;
125
126

    int epoll_nb_events;
127
#endif
Cedric Roux's avatar
 
Cedric Roux committed
128
129
} task_desc_t;

130
typedef struct itti_desc_s {
131
    thread_desc_t *threads;
Cedric Roux's avatar
 
Cedric Roux committed
132
    task_desc_t *tasks;
133

Cedric Roux's avatar
 
Cedric Roux committed
134
    /* Current message number. Incremented every call to send_msg_to_task */
135
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
 
Cedric Roux committed
136
137

    thread_id_t thread_max;
138
    task_id_t task_max;
Cedric Roux's avatar
 
Cedric Roux committed
139
140
    MessagesIds messages_id_max;

141
142
    pthread_t thread_handling_signals;

143
    const task_info_t *tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
144
145
    const message_info_t *messages_info;

146
147
148
149
    itti_lte_time_t lte_time;
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
 
Cedric Roux committed
150

151
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
 
Cedric Roux committed
152
153
154
155
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
156
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
 
Cedric Roux committed
157
158
}

159
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
160
161
162
163
164
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

165
const char *itti_get_message_name(MessagesIds message_id) {
Cedric Roux's avatar
 
Cedric Roux committed
166
167
168
169
170
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

171
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
172
{
173
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
Cedric Roux committed
174

175
    return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
176
177
}

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
static task_id_t itti_get_current_task_id()
{
    task_id_t task_id;
    thread_id_t thread_id;
    pthread_t thread = pthread_self ();

    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
    {
        thread_id = TASK_GET_THREAD_ID(task_id);
        if (itti_desc.threads[thread_id].task_thread == thread)
        {
            return task_id;
        }
    }

    return TASK_UNKNOWN;
}

196
197
198
199
200
201
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
    itti_desc.lte_time.frame = frame;
    itti_desc.lte_time.slot = slot;
}

202
int itti_send_broadcast_message(MessageDef *message_p) {
203
    task_id_t destination_task_id;
204
    thread_id_t origin_thread_id;
205
    uint32_t thread_id;
Cedric Roux's avatar
 
Cedric Roux committed
206
    int ret = 0;
207
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
208

209
    DevAssert(message_p != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
210

Cedric Roux's avatar
Cedric Roux committed
211
    origin_thread_id = TASK_GET_THREAD_ID(message_p->ittiMsgHeader.originTaskId);
Cedric Roux's avatar
 
Cedric Roux committed
212

213
214
    destination_task_id = TASK_FIRST;
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
215
216
        MessageDef *new_message_p;

217
218
219
220
        while (thread_id != TASK_GET_THREAD_ID(destination_task_id))
        {
            destination_task_id++;
        }
221
        /* Skip task that broadcast the message */
222
        if (thread_id != origin_thread_id) {
223
            /* Skip tasks which are not running */
224
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
225
                new_message_p = malloc (sizeof(MessageDef));
226
                DevAssert(message_p != NULL);
227
228

                memcpy (new_message_p, message_p, sizeof(MessageDef));
229
                result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
Cedric Roux's avatar
Cedric Roux committed
230
                DevCheck(result >= 0, message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
231
            }
Cedric Roux's avatar
 
Cedric Roux committed
232
233
        }
    }
234
    free (message_p);
Cedric Roux's avatar
 
Cedric Roux committed
235
236
237
238

    return ret;
}

Cedric Roux's avatar
Cedric Roux committed
239
240
inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
Cedric Roux's avatar
 
Cedric Roux committed
241
242
    MessageDef *temp = NULL;

243
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
244

245
246
247
248
249
250
    if (origin_task_id == TASK_UNKNOWN)
    {
        /* Try to identify real origin task ID */
        origin_task_id = itti_get_current_task_id();
    }

Cedric Roux's avatar
Cedric Roux committed
251
    temp = calloc (1, sizeof(MessageHeader) + size);
252
    DevAssert(temp != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
253

Cedric Roux's avatar
Cedric Roux committed
254
255
256
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
 
Cedric Roux committed
257
258
259
260

    return temp;
}

Cedric Roux's avatar
Cedric Roux committed
261
262
263
264
265
266
267
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
    return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message)
{
268
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
269
    message_list_t *new;
270
271
272
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
 
Cedric Roux committed
273

274
    DevAssert(message != NULL);
275
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
276

Cedric Roux's avatar
Cedric Roux committed
277
278
279
280
281
    message->ittiMsgHeader.destinationTaskId = task_id;
    message->ittiMsgHeader.instance = instance;
    message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
    message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
    message_id = message->ittiMsgHeader.messageId;
Cedric Roux's avatar
 
Cedric Roux committed
282
283
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

284
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
 
Cedric Roux committed
285

286
287
    /* Increment the global message number */
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
 
Cedric Roux committed
288

289
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
Cedric Roux's avatar
Cedric Roux committed
290
                             sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
291

292
293
294
295
296
    if (task_id != TASK_UNKNOWN)
    {
        /* We cannot send a message if the task is not running */
        DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state,
                 TASK_STATE_READY, thread_id);
Cedric Roux's avatar
 
Cedric Roux committed
297

298
299
300
#if !defined(ENABLE_EVENT_FD)
        /* Lock the mutex to get exclusive access to the list */
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
301

302
303
304
305
        /* Check the number of messages in the queue */
        DevCheck(itti_desc.tasks[task_id].message_in_queue < itti_desc.tasks_info[task_id].queue_size,
                 task_id, itti_desc.tasks[task_id].message_in_queue, itti_desc.tasks_info[task_id].queue_size);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
306

307
        /* Allocate new list element */
308
        new = (message_list_t *) malloc (sizeof(struct message_list_s));
309
        DevAssert(new != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
310

311
312
313
314
        /* Fill in members */
        new->msg = message;
        new->message_number = message_number;
        new->message_priority = priority;
Cedric Roux's avatar
 
Cedric Roux committed
315

316
#if defined(ENABLE_EVENT_FD)
317
318
        {
            uint64_t sem_counter = 1;
319

320
            lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
321

322
323
324
            /* Call to write for an event fd must be of 8 bytes */
            write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
        }
325
#else
326
327
328
329
        if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
            STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element);
        }
        else {
Cedric Roux's avatar
 
Cedric Roux committed
330
331
332
333
//         struct message_list_s *insert_after = NULL;
//         struct message_list_s *temp;
// 
//         /* This method is inefficient... */
334
//         STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) {
Cedric Roux's avatar
 
Cedric Roux committed
335
336
337
338
339
340
341
342
343
344
345
346
347
//             struct message_list_s *next;
//             next = STAILQ_NEXT(temp, next_element);
//             /* Increment message priority to create a sort of
//              * priority based scheduler */
// //             if (temp->message_priority < TASK_PRIORITY_MAX) {
// //                 temp->message_priority++;
// //             }
//             if (next && next->message_priority < priority) {
//                 insert_after = temp;
//                 break;
//             }
//         }
//         if (insert_after == NULL) {
348
        STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element);
Cedric Roux's avatar
 
Cedric Roux committed
349
//         } else {
350
//             STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new,
Cedric Roux's avatar
 
Cedric Roux committed
351
352
//                                 next_element);
//         }
353
        }
Cedric Roux's avatar
 
Cedric Roux committed
354

355
356
357
358
359
360
361
362
        /* Update the number of messages in the queue */
        itti_desc.tasks[task_id].message_in_queue++;
        if (itti_desc.tasks[task_id].message_in_queue == 1) {
            /* Emit a signal to wake up target task thread */
            pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var);
        }
        /* Release the mutex */
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
363
#endif
364
    }
365

366
367
    ITTI_DEBUG(
            "Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
368
            itti_desc.messages_info[message_id].name, message_number, priority, task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
369
370
371
    return 0;
}

372
#if defined(ENABLE_EVENT_FD)
373
374
375
376
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
    struct epoll_event event;

377
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
378
379
    DevCheck(fd >= 0, fd, 0, 0);

380
    itti_desc.tasks[task_id].nb_events++;
381
382

    /* Reallocate the events */
383
384
385
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
386

387
    event.events  = EPOLLIN | EPOLLERR;
388
389
390
    event.data.fd = fd;

    /* Add the event fd to the list of monitored events */
391
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd,
392
393
394
395
396
397
398
        &event) != 0)
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }
399
400

    ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
401
402
403
404
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
405
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
406
407
408
    DevCheck(fd >= 0, fd, 0, 0);

    /* Add the event fd to the list of monitored events */
409
    if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
410
411
412
413
414
415
416
    {
        ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
                   itti_get_task_name(task_id), fd, strerror(errno));
        /* Always assert on this condition */
        DevAssert(0 == 1);
    }

417
418
419
420
    itti_desc.tasks[task_id].nb_events--;
    itti_desc.tasks[task_id].events = realloc(
        itti_desc.tasks[task_id].events,
        itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
421
422
423
424
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
425
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
426

427
    *events = itti_desc.tasks[task_id].events;
428

429
    return itti_desc.tasks[task_id].epoll_nb_events;
430
431
}

432
433
434
435
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
    int epoll_ret = 0;
    int epoll_timeout = 0;
436
    int i;
437

438
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
    DevAssert(received_msg != NULL);

    *received_msg = NULL;

    if (polling) {
        /* In polling mode we set the timeout to 0 causing epoll_wait to return
         * immediately.
         */
        epoll_timeout = 0;
    } else {
        /* timeout = -1 causes the epoll_wait to wait indefinetely.
         */
        epoll_timeout = -1;
    }

454
    do {
455
456
457
        epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd,
                               itti_desc.tasks[task_id].events,
                               itti_desc.tasks[task_id].nb_events,
458
459
                               epoll_timeout);
    } while (epoll_ret < 0 && errno == EINTR);
460
461
462

    if (epoll_ret < 0) {
        ITTI_ERROR("epoll_wait failed for task %s: %s\n",
463
                   itti_get_task_name(task_id), strerror(errno));
464
465
466
467
468
469
470
        DevAssert(0 == 1);
    }
    if (epoll_ret == 0 && polling) {
        /* No data to read -> return */
        return;
    }

471
    itti_desc.tasks[task_id].epoll_nb_events = epoll_ret;
472

473
    for (i = 0; i < epoll_ret; i++) {
474
        /* Check if there is an event for ITTI for the event fd */
475
476
        if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) &&
            (itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd))
477
        {
478
479
480
481
            struct message_list_s *message;
            uint64_t sem_counter;

            /* Read will always return 1 */
482
            read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
483

484
            if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
485
                /* No element in list -> this should not happen */
486
                DevParam(task_id, epoll_ret, 0);
487
488
            }
            *received_msg = message->msg;
489
            free (message);
490
            return;
491
492
493
494
495
496
497
498
499
500
        }
    }
}
#endif

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
#if defined(ENABLE_EVENT_FD)
    itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
#else
501
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
Cedric Roux's avatar
 
Cedric Roux committed
502
503
504
    DevAssert(received_msg != NULL);

    // Lock the mutex to get exclusive access to the list
505
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
506

507
508
    if (itti_desc.tasks[task_id].message_in_queue == 0) {
        ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
509
        // Wait while list == 0
510
511
512
513
        pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var,
                           &itti_desc.tasks[task_id].message_queue_mutex);
        ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n",
                   task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
514
515
    }

516
    if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
517
        message_list_t *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue);
Cedric Roux's avatar
 
Cedric Roux committed
518
519
520
521
522

        /* Update received_msg reference */
        *received_msg = temp->msg;

        /* Remove message from queue */
523
        STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element);
524
        free (temp);
525
        itti_desc.tasks[task_id].message_in_queue--;
Cedric Roux's avatar
 
Cedric Roux committed
526
527
    }
    // Release the mutex
528
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
529
#endif
Cedric Roux's avatar
 
Cedric Roux committed
530
531
}

532
533
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
    DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
534
    DevAssert(received_msg != NULL);
Cedric Roux's avatar
 
Cedric Roux committed
535
536
537

    *received_msg = NULL;

538
539
540
#if defined(ENABLE_EVENT_FD)
    itti_receive_msg_internal_event_fd(task_id, 1, received_msg);
#else
541
    if (itti_desc.tasks[task_id].message_in_queue != 0) {
542
        message_list_t *temp;
Cedric Roux's avatar
 
Cedric Roux committed
543
544

        // Lock the mutex to get exclusive access to the list
545
        pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
546

547
        STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element)
Cedric Roux's avatar
 
Cedric Roux committed
548
        {
549
550
551
552
553
554
555
556
557
            /* Update received_msg reference */
            *received_msg = temp->msg;

            /* Remove message from queue */
            STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element);
            free (temp);
            itti_desc.tasks[task_id].message_in_queue--;

            ITTI_DEBUG(
Cedric Roux's avatar
Cedric Roux committed
558
559
                       "Receiver queue[(%u:%s)] got new message %s, number %lu\n",
                       task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->ittiMsgHeader.messageId].name, temp->message_number);
560
            break;
Cedric Roux's avatar
 
Cedric Roux committed
561
562
563
        }

        // Release the mutex
564
        pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
Cedric Roux's avatar
 
Cedric Roux committed
565
    }
566
#endif
Cedric Roux's avatar
 
Cedric Roux committed
567

568
    if ((itti_debug_poll) && (*received_msg == NULL)) {
569
        ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
Cedric Roux's avatar
 
Cedric Roux committed
570
571
572
    }
}

573
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
 
Cedric Roux committed
574
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
575
    int result;
Cedric Roux's avatar
 
Cedric Roux committed
576
577

    DevAssert(start_routine != NULL);
578
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
579
580
    DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
 
Cedric Roux committed
581

582
    itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
 
Cedric Roux committed
583

584
    result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
585
    DevCheck(result >= 0, task_id, thread_id, result);
Cedric Roux's avatar
 
Cedric Roux committed
586
587

    /* Wait till the thread is completely ready */
588
    while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
589
        ;
Cedric Roux's avatar
 
Cedric Roux committed
590
591
592
    return 0;
}

593
void itti_mark_task_ready(task_id_t task_id) {
Cedric Roux's avatar
 
Cedric Roux committed
594
595
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

596
597
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

598
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
599
    // Lock the mutex to get exclusive access to the list
600
    pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
601
602
#endif

603
    itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
604

605
#if !defined(ENABLE_EVENT_FD)
Cedric Roux's avatar
 
Cedric Roux committed
606
    // Release the mutex
607
    pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
608
#endif
609
610
}

611
612
613
614
void itti_exit_task(void) {
    pthread_exit (NULL);
}

615
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
616
    // Sends Terminate signals to all tasks.
617
618
619
620
621
622
623
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
 
Cedric Roux committed
624
625
}

626
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
627
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
628
629
    task_id_t task_id;
    thread_id_t thread_id;
630
    itti_desc.message_number = 1;
Cedric Roux's avatar
 
Cedric Roux committed
631

632
    ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
633

634
635
    CHECK_INIT_RETURN(signal_init());

Cedric Roux's avatar
 
Cedric Roux committed
636
    /* Saves threads and messages max values */
637
    itti_desc.task_max = task_max;
Cedric Roux's avatar
 
Cedric Roux committed
638
639
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
640
    itti_desc.thread_handling_signals = -1;
641
    itti_desc.tasks_info = tasks_info;
Cedric Roux's avatar
 
Cedric Roux committed
642
643
644
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
645
646
647
648
    itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

    /* Allocates memory for threads info */
    itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));
Cedric Roux's avatar
 
Cedric Roux committed
649
650

    /* Initializing each queue and related stuff */
651
    for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
652
653
    {
#if defined(ENABLE_EVENT_FD)
654
655
        ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
        if (lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size) < 0)
656
        {
657
            ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id);
658
659
660
            DevAssert(0 == 1);
        }

661
662
        itti_desc.tasks[task_id].epoll_fd = epoll_create1(0);
        if (itti_desc.tasks[task_id].epoll_fd == -1) {
663
664
665
666
667
            ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

668
        itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
669
670
        if (itti_desc.tasks[task_id].task_event_fd == -1)
        {
671
672
673
674
675
            ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }

676
        itti_desc.tasks[task_id].nb_events = 1;
677

678
        itti_desc.tasks[task_id].events = malloc(sizeof(struct epoll_event));
679

680
        itti_desc.tasks[task_id].events->events  = EPOLLIN | EPOLLERR;
681
        itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd;
682
683

        /* Add the event fd to the list of monitored events */
684
685
        if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD,
            itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0)
686
        {
687
            ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
688
689
690
            /* Always assert on this condition */
            DevAssert(0 == 1);
        }
691
692
693

        ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
                   itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id));
694
#else
695
696
        STAILQ_INIT (&itti_desc.tasks[task_id].message_queue);
        itti_desc.tasks[task_id].message_in_queue = 0;
697

Cedric Roux's avatar
 
Cedric Roux committed
698
        // Initialize mutexes
699
        pthread_mutex_init (&itti_desc.tasks[task_id].message_queue_mutex, NULL);
700

Cedric Roux's avatar
 
Cedric Roux committed
701
        // Initialize Cond vars
702
        pthread_cond_init (&itti_desc.tasks[task_id].message_queue_cond_var, NULL);
703
#endif
704
    }
705

706
    /* Initializing each thread */
707
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
708
    {
709
        itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
Cedric Roux's avatar
 
Cedric Roux committed
710
    }
711

712
    itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
 
Cedric Roux committed
713

714
    CHECK_INIT_RETURN(timer_init ());
Cedric Roux's avatar
 
Cedric Roux committed
715
716
717
718

    return 0;
}

719
720
void itti_wait_tasks_end(void) {
    int end = 0;
721
722
    int thread_id;
    task_id_t task_id;
723
724
725
    int ready_tasks;
    int result;
    int retries = 10;
726
727
728
729
730
731
732
733

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

734
735
736
    do {
        ready_tasks = 0;

737
738
        task_id = TASK_FIRST;
        for (thread_id = THREAD_FIRST; thread_id < itti_desc.task_max; thread_id++) {
739
            /* Skip tasks which are not running */
740
741
742
743
744
            if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
                while (thread_id != TASK_GET_THREAD_ID(task_id))
                {
                    task_id++;
                }
745

746
                result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
747

748
                ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result);
749
750
751

                if (result == 0) {
                    /* Thread has terminated */
752
                    itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
753
754
755
756
757
758
759
760
761
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
762
        }
763
764
765
766
767
    } while ((ready_tasks > 0) && (retries--));

    if (ready_tasks > 0) {
        ITTI_DEBUG("Some threads are still running, force exit\n");
        exit (0);
768
    }
769
770

    itti_dump_exit();
771
772
773
}

void itti_send_terminate_message(task_id_t task_id) {
Cedric Roux's avatar
 
Cedric Roux committed
774
775
    MessageDef *terminate_message_p;

776
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
Cedric Roux's avatar
 
Cedric Roux committed
777

778
    itti_send_broadcast_message (terminate_message_p);
Cedric Roux's avatar
 
Cedric Roux committed
779
}