intertask_interface.c 17.1 KB
Newer Older
Cedric Roux's avatar
   
Cedric Roux committed
1
2
/*******************************************************************************

3
4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
Cedric Roux's avatar
   
Cedric Roux committed
5

6
7
8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
Cedric Roux's avatar
   
Cedric Roux committed
9

10
11
12
13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
Cedric Roux's avatar
   
Cedric Roux committed
14

15
16
17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Cedric Roux's avatar
   
Cedric Roux committed
18

19
20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
Cedric Roux's avatar
   
Cedric Roux committed
21

22
23
24
25
26
27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
Cedric Roux's avatar
   
Cedric Roux committed
28

29
 *******************************************************************************/
Cedric Roux's avatar
   
Cedric Roux committed
30

31
#define _GNU_SOURCE
Cedric Roux's avatar
   
Cedric Roux committed
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
38
#include <signal.h>
Cedric Roux's avatar
   
Cedric Roux committed
39
40
41
42
43
44
45
46
47
48
49

#include "queue.h"
#include "assertions.h"

#include "intertask_interface.h"
#include "intertask_interface_dump.h"
/* Includes "intertask_interface_init.h" to check prototype coherence, but disable threads and messages information generation */
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

50
#include "signals.h"
Cedric Roux's avatar
   
Cedric Roux committed
51
52
#include "timer.h"

53
int itti_debug = 0;
Cedric Roux's avatar
   
Cedric Roux committed
54
55
56
57
58
59
60
61
62
63

#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \
    while(0)
#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \
    while(0)

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
64
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
   
Cedric Roux committed
65
66
67
68
69
70
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
struct message_list_s {
    STAILQ_ENTRY(message_list_s) next_element;

71
    MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
   
Cedric Roux committed
72

73
74
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
Cedric Roux's avatar
   
Cedric Roux committed
75
76
77
78
};

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
Cedric Roux's avatar
Cedric Roux committed
79
    STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
Cedric Roux's avatar
   
Cedric Roux committed
80
81

    /* Number of messages in the queue */
82
    volatile uint32_t message_in_queue;
Cedric Roux's avatar
   
Cedric Roux committed
83
    /* Mutex for the message queue */
84
    pthread_mutex_t message_queue_mutex;
Cedric Roux's avatar
   
Cedric Roux committed
85
    /* Conditional var for message queue and task synchro */
86
    pthread_cond_t message_queue_cond_var;
Cedric Roux's avatar
Cedric Roux committed
87
    pthread_t      task_thread;
Cedric Roux's avatar
   
Cedric Roux committed
88
89
90
91
92
93
    volatile task_state_t task_state;
} task_desc_t;

struct itti_desc_s {
    task_desc_t *tasks;
    /* Current message number. Incremented every call to send_msg_to_task */
94
    message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
   
Cedric Roux committed
95
96
97
98

    thread_id_t thread_max;
    MessagesIds messages_id_max;

99
100
    pthread_t thread_handling_signals;

Cedric Roux's avatar
   
Cedric Roux committed
101
102
103
104
105
106
    const char * const *threads_name;
    const message_info_t *messages_info;
};

static struct itti_desc_s itti_desc;

107
static inline message_number_t itti_increment_message_number(void) {
Cedric Roux's avatar
   
Cedric Roux committed
108
109
110
111
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
112
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
   
Cedric Roux committed
113
114
}

115
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
Cedric Roux's avatar
   
Cedric Roux committed
116
117
118
119
120
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

121
const char *itti_get_message_name(MessagesIds message_id) {
Cedric Roux's avatar
   
Cedric Roux committed
122
123
124
125
126
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

127
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
128
129
130
131
132
133
134
135
{
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

    return (itti_desc.threads_name[thread_id]);
}

136
137
int itti_send_broadcast_message(MessageDef *message_p) {
    thread_id_t origin_thread_id;
Cedric Roux's avatar
   
Cedric Roux committed
138
139
    uint32_t i;
    int ret = 0;
140
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
141

142
    DevAssert(message_p != NULL);
Cedric Roux's avatar
   
Cedric Roux committed
143

144
    origin_thread_id = TASK_GET_THREAD_ID(message_p->header.originTaskId);
Cedric Roux's avatar
   
Cedric Roux committed
145

146
147
148
149
150
151
152
153
    for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
        MessageDef *new_message_p;

        /* Skip task that broadcast the message */
        if (i != origin_thread_id) {
            /* Skip tasks which are not running */
            if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
                new_message_p = malloc (sizeof(MessageDef));
154
                DevAssert(message_p != NULL);
155
156
157

                memcpy (new_message_p, message_p, sizeof(MessageDef));
                result = itti_send_msg_to_task (TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p);
158
                DevCheck(result >= 0, message_p->header.messageId, i, 0);
159
            }
Cedric Roux's avatar
   
Cedric Roux committed
160
161
        }
    }
162
    free (message_p);
Cedric Roux's avatar
   
Cedric Roux committed
163
164
165
166

    return ret;
}

167
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id) {
Cedric Roux's avatar
   
Cedric Roux committed
168
169
    MessageDef *temp = NULL;

170
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);
Cedric Roux's avatar
   
Cedric Roux committed
171

172
    temp = calloc (1, MESSAGE_SIZE(message_id));
173
    DevAssert(temp != NULL);
Cedric Roux's avatar
   
Cedric Roux committed
174
175
176
177
178
179
180
181

    temp->header.messageId = message_id;
    temp->header.originTaskId = origin_task_id;
    temp->header.size = itti_desc.messages_info[message_id].size;

    return temp;
}

182
183
184
185
186
187
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message) {
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
    struct message_list_s *new;
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
Cedric Roux's avatar
   
Cedric Roux committed
188

189
190
    DevAssert(message != NULL);
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
Cedric Roux's avatar
   
Cedric Roux committed
191
192
193
194
195
196

    message->header.destinationTaskId = task_id;
    message->header.instance = instance;
    message_id = message->header.messageId;
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

197
    priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
   
Cedric Roux committed
198
199

    /* Lock the mutex to get exclusive access to the list */
200
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
Cedric Roux's avatar
   
Cedric Roux committed
201
202

    /* We cannot send a message if the task is not running */
203
204
    DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY, itti_desc.tasks[thread_id].task_state,
             TASK_STATE_READY, thread_id);
Cedric Roux's avatar
   
Cedric Roux committed
205
206

    /* Check the number of messages in the queue */
207
208
    DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK,
             (itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK,
Cedric Roux's avatar
   
Cedric Roux committed
209
210
211
             itti_desc.tasks[thread_id].message_in_queue);

    /* Allocate new list element */
212
213
    new = (struct message_list_s *) malloc (sizeof(struct message_list_s));
    DevAssert(new != NULL);
Cedric Roux's avatar
   
Cedric Roux committed
214
215

    /* Increment the global message number */
216
    message_number = itti_increment_message_number ();
Cedric Roux's avatar
   
Cedric Roux committed
217
218

    /* Fill in members */
219
220
    new->msg = message;
    new->message_number = message_number;
Cedric Roux's avatar
   
Cedric Roux committed
221
222
    new->message_priority = priority;

223
224
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
                             MESSAGE_SIZE(message_id));
Cedric Roux's avatar
   
Cedric Roux committed
225

226
227
228
229
    if (STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
        STAILQ_INSERT_HEAD (&itti_desc.tasks[thread_id].message_queue, new, next_element);
    }
    else {
Cedric Roux's avatar
   
Cedric Roux committed
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//         struct message_list_s *insert_after = NULL;
//         struct message_list_s *temp;
// 
//         /* This method is inefficient... */
//         STAILQ_FOREACH(temp, &itti_desc.tasks[thread_id].message_queue, next_element) {
//             struct message_list_s *next;
//             next = STAILQ_NEXT(temp, next_element);
//             /* Increment message priority to create a sort of
//              * priority based scheduler */
// //             if (temp->message_priority < TASK_PRIORITY_MAX) {
// //                 temp->message_priority++;
// //             }
//             if (next && next->message_priority < priority) {
//                 insert_after = temp;
//                 break;
//             }
//         }
//         if (insert_after == NULL) {
248
        STAILQ_INSERT_TAIL (&itti_desc.tasks[thread_id].message_queue, new, next_element);
Cedric Roux's avatar
   
Cedric Roux committed
249
250
251
252
253
254
255
256
257
258
//         } else {
//             STAILQ_INSERT_AFTER(&itti_desc.tasks[thread_id].message_queue, insert_after, new,
//                                 next_element);
//         }
    }

    /* Update the number of messages in the queue */
    itti_desc.tasks[thread_id].message_in_queue++;
    if (itti_desc.tasks[thread_id].message_in_queue == 1) {
        /* Emit a signal to wake up target task thread */
259
        pthread_cond_signal (&itti_desc.tasks[thread_id].message_queue_cond_var);
Cedric Roux's avatar
   
Cedric Roux committed
260
261
    }
    /* Release the mutex */
262
263
264
265
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
    ITTI_DEBUG(
            "Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
            itti_desc.messages_info[message_id].name, message_number, priority, thread_id, itti_desc.threads_name[thread_id]);
Cedric Roux's avatar
   
Cedric Roux committed
266
267
268
    return 0;
}

269
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
Cedric Roux's avatar
   
Cedric Roux committed
270
271
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

272
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
Cedric Roux's avatar
   
Cedric Roux committed
273
274
275
    DevAssert(received_msg != NULL);

    // Lock the mutex to get exclusive access to the list
276
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
Cedric Roux's avatar
   
Cedric Roux committed
277
278

    if (itti_desc.tasks[thread_id].message_in_queue == 0) {
279
        ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", thread_id, itti_desc.threads_name[thread_id]);
Cedric Roux's avatar
   
Cedric Roux committed
280
        // Wait while list == 0
281
282
        pthread_cond_wait (&itti_desc.tasks[thread_id].message_queue_cond_var,
                           &itti_desc.tasks[thread_id].message_queue_mutex);
Cedric Roux's avatar
   
Cedric Roux committed
283
284
285
286
        ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification for task %x\n",
                   thread_id, itti_desc.threads_name[thread_id], task_id);
    }

287
288
    if (!STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
        struct message_list_s *temp = STAILQ_FIRST (&itti_desc.tasks[thread_id].message_queue);
Cedric Roux's avatar
   
Cedric Roux committed
289
290
291
292
293

        /* Update received_msg reference */
        *received_msg = temp->msg;

        /* Remove message from queue */
294
295
        STAILQ_REMOVE_HEAD (&itti_desc.tasks[thread_id].message_queue, next_element);
        free (temp);
Cedric Roux's avatar
   
Cedric Roux committed
296
297
298
        itti_desc.tasks[thread_id].message_in_queue--;
    }
    // Release the mutex
299
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
Cedric Roux's avatar
   
Cedric Roux committed
300
301
}

302
void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) {
Cedric Roux's avatar
   
Cedric Roux committed
303
304
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

305
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
306
    DevAssert(received_msg != NULL);
Cedric Roux's avatar
   
Cedric Roux committed
307
308
309

    *received_msg = NULL;

310
    if (itti_desc.tasks[thread_id].message_in_queue != 0) {
Cedric Roux's avatar
   
Cedric Roux committed
311
312
313
314
315
        struct message_list_s *temp;

        // Lock the mutex to get exclusive access to the list
        pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);

316
        STAILQ_FOREACH (temp, &itti_desc.tasks[thread_id].message_queue, next_element)
Cedric Roux's avatar
   
Cedric Roux committed
317
318
        {
            if ((temp->msg->header.destinationTaskId == task_id)
319
                    && ((instance == INSTANCE_ALL) || (temp->msg->header.instance == instance))) {
Cedric Roux's avatar
   
Cedric Roux committed
320
321
322
323
                /* Update received_msg reference */
                *received_msg = temp->msg;

                /* Remove message from queue */
324
                STAILQ_REMOVE (&itti_desc.tasks[thread_id].message_queue, temp, message_list_s, next_element);
Cedric Roux's avatar
   
Cedric Roux committed
325
326
327
                free (temp);
                itti_desc.tasks[thread_id].message_in_queue--;

328
329
330
                ITTI_DEBUG(
                        "Receiver queue[(%u:%s)] got new message %s, number %lu for task %x\n",
                        thread_id, itti_desc.threads_name[thread_id], itti_desc.messages_info[temp->msg->header.messageId].name, temp->message_number, task_id);
Cedric Roux's avatar
   
Cedric Roux committed
331
332
333
334
335
336
337
338
                break;
            }
        }

        // Release the mutex
        pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
    }

339
    if (*received_msg == NULL) {
Cedric Roux's avatar
   
Cedric Roux committed
340
341
342
343
        ITTI_DEBUG("No message in queue[(%u:%s)] for task %x\n", thread_id, itti_desc.threads_name[thread_id], task_id);
    }
}

344
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
Cedric Roux's avatar
   
Cedric Roux committed
345
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
346
    int result;
Cedric Roux's avatar
   
Cedric Roux committed
347
348

    DevAssert(start_routine != NULL);
349
350
351
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
    DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id,
             itti_desc.tasks[thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
352
353
354

    itti_desc.tasks[thread_id].task_state = TASK_STATE_STARTING;

355
356
    result = pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p);
    DevCheck(result>= 0, task_id, thread_id, result);
Cedric Roux's avatar
   
Cedric Roux committed
357
358

    /* Wait till the thread is completely ready */
359
360
    while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY)
        ;
Cedric Roux's avatar
   
Cedric Roux committed
361
362
363
    return 0;
}

364
void itti_mark_task_ready(task_id_t task_id) {
Cedric Roux's avatar
   
Cedric Roux committed
365
366
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

367
368
    DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

Cedric Roux's avatar
   
Cedric Roux committed
369
    // Lock the mutex to get exclusive access to the list
370
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
Cedric Roux's avatar
   
Cedric Roux committed
371
    itti_desc.tasks[thread_id].task_state = TASK_STATE_READY;
372

Cedric Roux's avatar
   
Cedric Roux committed
373
    // Release the mutex
374
375
376
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}

377
378
379
380
void itti_exit_task(void) {
    pthread_exit (NULL);
}

381
void itti_terminate_tasks(task_id_t task_id) {
Cedric Roux's avatar
Cedric Roux committed
382
    // Sends Terminate signals to all tasks.
383
384
385
386
387
388
389
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
Cedric Roux's avatar
   
Cedric Roux committed
390
391
}

392
int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * const *threads_name,
393
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
Cedric Roux's avatar
   
Cedric Roux committed
394
    int i;
395
    itti_desc.message_number = 1;
Cedric Roux's avatar
   
Cedric Roux committed
396

397
398
    ITTI_DEBUG( "Init: %d threads, %d messages\n", thread_max, messages_id_max);

399
400
    CHECK_INIT_RETURN(signal_init());

Cedric Roux's avatar
   
Cedric Roux committed
401
402
403
    /* Saves threads and messages max values */
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
404
    itti_desc.thread_handling_signals = -1;
Cedric Roux's avatar
   
Cedric Roux committed
405
406
407
408
    itti_desc.threads_name = threads_name;
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
409
    itti_desc.tasks = calloc (itti_desc.thread_max, sizeof(task_desc_t));
Cedric Roux's avatar
   
Cedric Roux committed
410
411
412

    /* Initializing each queue and related stuff */
    for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
413
        STAILQ_INIT (&itti_desc.tasks[i].message_queue);
Cedric Roux's avatar
   
Cedric Roux committed
414
        itti_desc.tasks[i].message_in_queue = 0;
415

Cedric Roux's avatar
   
Cedric Roux committed
416
        // Initialize mutexes
417
        pthread_mutex_init (&itti_desc.tasks[i].message_queue_mutex, NULL);
418

Cedric Roux's avatar
   
Cedric Roux committed
419
        // Initialize Cond vars
420
        pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL);
Cedric Roux's avatar
   
Cedric Roux committed
421
422
        itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED;
    }
423
    itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
   
Cedric Roux committed
424

425
    CHECK_INIT_RETURN(timer_init ());
Cedric Roux's avatar
   
Cedric Roux committed
426
427
428
429

    return 0;
}

430
431
432
void itti_wait_tasks_end(void) {
    int end = 0;
    int i;
433
434
435
    int ready_tasks;
    int result;
    int retries = 10;
436
437
438
439
440
441
442
443

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

444
445
446
447
448
449
    do {
        ready_tasks = 0;

        for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
            /* Skip tasks which are not running */
            if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
450

451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
                result = pthread_tryjoin_np (itti_desc.tasks[i].task_thread, NULL);

                ITTI_DEBUG("Thread %s join status %d\n", itti_desc.threads_name[i], result);

                if (result == 0) {
                    /* Thread has terminated */
                    itti_desc.tasks[i].task_state = TASK_STATE_ENDED;
                }
                else {
                    /* Thread is still running, count it */
                    ready_tasks++;
                }
            }
        }
        if (ready_tasks > 0) {
            usleep (100 * 1000);
467
        }
468
469
470
471
472
    } while ((ready_tasks > 0) && (retries--));

    if (ready_tasks > 0) {
        ITTI_DEBUG("Some threads are still running, force exit\n");
        exit (0);
473
474
475
476
    }
}

void itti_send_terminate_message(task_id_t task_id) {
Cedric Roux's avatar
   
Cedric Roux committed
477
478
    MessageDef *terminate_message_p;

479
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
Cedric Roux's avatar
   
Cedric Roux committed
480

481
    itti_send_broadcast_message (terminate_message_p);
Cedric Roux's avatar
   
Cedric Roux committed
482
}