intertask_interface.c 33.8 KB
Newer Older
1
#define _GNU_SOURCE
Cedric Roux's avatar
   
Cedric Roux committed
2
3
4
5
6
7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
8
#include <signal.h>
Cedric Roux's avatar
   
Cedric Roux committed
9

10
11
#include <sys/epoll.h>
#include <sys/eventfd.h>
12

13
14
15
16
#ifdef RTAI
# include <rtai_shm.h>
#endif

gauthier's avatar
gauthier committed
17
18
19
20
#if !defined(TRUE)
#define TRUE 1
#endif

21
22
23
24
25
26
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

27
#if defined(OAI_EMU) || defined(RTAI)
28
# include "memory_pools.h"
29
30
31
# include "vcd_signal_dumper.h"
#endif

32
33
34
35
#if T_TRACER
#include "T.h"
#endif

36
37
38
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
   
Cedric Roux committed
39
40
41
42
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

43
#include "signals.h"
Cedric Roux's avatar
   
Cedric Roux committed
44
45
#include "timer.h"

46
47
48
49
50
51
52
53
#ifdef RTAI
# include <rtai.h>
# include <rtai_fifos.h>
#    define FIFO_PRINTF_MAX_STRING_SIZE 1000
#    define FIFO_PRINTF_NO              62
#    define FIFO_PRINTF_SIZE            65536
#endif

54
55
56
57
58
59
60
61
62
63
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS;
Cedric Roux's avatar
   
Cedric Roux committed
64

65
66
/* Don't flush if using RTAI */
#ifdef RTAI
67
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) rt_log_debug (x, ##args); } while(0);
68
#else
69
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } while(0);
70
#endif
71
#define ITTI_ERROR(x, args...)      do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } while(0);
Cedric Roux's avatar
   
Cedric Roux committed
72
73
74
75

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

76
77
78
79
80
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

Cedric Roux's avatar
   
Cedric Roux committed
81
typedef enum task_state_s {
82
  TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
   
Cedric Roux committed
83
84
85
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
86
typedef struct message_list_s {
87
  MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
   
Cedric Roux committed
88

89
90
  message_number_t message_number; ///< Unique message number
  uint32_t message_priority; ///< Message priority
91
} message_list_t;
Cedric Roux's avatar
   
Cedric Roux committed
92

93
typedef struct thread_desc_s {
94
95
  /* pthread associated with the thread */
  pthread_t task_thread;
96

97
98
  /* State of the thread */
  volatile task_state_t task_state;
99

100
101
  /* This fd is used internally by ITTI. */
  int epoll_fd;
102

103
104
  /* The thread fd */
  int task_event_fd;
105

106
107
  /* Number of events to monitor */
  uint16_t nb_events;
108

109

110
111
112
113
114
115
  /* Array of events monitored by the task.
   * By default only one fd is monitored (the one used to received messages
   * from other tasks).
   * More events can be suscribed later by the task itself.
   */
  struct epoll_event *events;
116

117
  int epoll_nb_events;
118

119
120
121
  //#ifdef RTAI
  /* Flag to mark real time thread */
  unsigned real_time;
122

123
124
125
  /* Counter to indicate from RTAI threads that messages are pending for the thread */
  unsigned messages_pending;
  //#endif
126
127
128
} thread_desc_t;

typedef struct task_desc_s {
129
130
  /* Queue of messages belonging to the task */
  struct lfds611_queue_state *message_queue;
Cedric Roux's avatar
   
Cedric Roux committed
131
132
} task_desc_t;

133
typedef struct itti_desc_s {
134
135
  thread_desc_t *threads;
  task_desc_t   *tasks;
136

137
138
  /* Current message number. Incremented every call to send_msg_to_task */
  message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
   
Cedric Roux committed
139

140
141
142
  thread_id_t thread_max;
  task_id_t task_max;
  MessagesIds messages_id_max;
Cedric Roux's avatar
   
Cedric Roux committed
143

144
145
  boolean_t thread_handling_signals;
  pthread_t thread_ref;
146

147
148
  const task_info_t *tasks_info;
  const message_info_t *messages_info;
Cedric Roux's avatar
   
Cedric Roux committed
149

150
  itti_lte_time_t lte_time;
151

152
  int running;
153

154
155
156
  volatile uint32_t created_tasks;
  volatile uint32_t ready_tasks;
  volatile int      wait_tasks;
157
#ifdef RTAI
158
  pthread_t rt_relay_thread;
159
#endif
160
161

#if defined(OAI_EMU) || defined(RTAI)
162
  memory_pools_handle_t memory_pools_handle;
163

164
165
166
  uint64_t vcd_poll_msg;
  uint64_t vcd_receive_msg;
  uint64_t vcd_send_msg;
167
#endif
168
169
170
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
   
Cedric Roux committed
171

172
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
173
{
174
  void *ptr = NULL;
175

176
#if defined(OAI_EMU) || defined(RTAI)
177
178
179
180
181
182
183
184
  ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);

  if (ptr == NULL) {
    char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

    ITTI_ERROR (" Memory pools statistics:\n%s", statistics);
    free (statistics);
  }
185

winckel's avatar
winckel committed
186
#else
187
  ptr = malloc (size);
188
#endif
winckel's avatar
winckel committed
189

190
  AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)!\n", (int) size, origin_task_id, destination_task_id);
191

192
  return ptr;
193
194
}

195
int itti_free(task_id_t task_id, void *ptr)
196
{
197
198
  int result = EXIT_SUCCESS;
  AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
199
200

#if defined(OAI_EMU) || defined(RTAI)
201
  result = memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);
202

203
  AssertError (result == EXIT_SUCCESS, {}, "Failed to free memory at %p (%d)!\n", ptr, task_id);
204
#else
205
  free (ptr);
206
#endif
207

208
  return (result);
209
210
}

211
212
213
214
215
216
217
static inline message_number_t itti_increment_message_number(void)
{
  /* Atomic operation supported by GCC: returns the current message number
   * and then increment it by 1.
   * This can be done without mutex.
   */
  return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
   
Cedric Roux committed
218
219
}

220
221
222
static inline uint32_t itti_get_message_priority(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
223

224
  return (itti_desc.messages_info[message_id].priority);
Cedric Roux's avatar
   
Cedric Roux committed
225
226
}

227
228
229
const char *itti_get_message_name(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
230

231
  return (itti_desc.messages_info[message_id].name);
Cedric Roux's avatar
   
Cedric Roux committed
232
233
}

234
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
235
{
236
237
238
239
240
  if (itti_desc.task_max > 0) {
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  } else {
    return ("ITTI NOT INITIALIZED !!!");
  }
Cedric Roux's avatar
Cedric Roux committed
241

242
  return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
243
244
}

245
static task_id_t itti_get_current_task_id(void)
246
{
247
248
249
250
251
252
253
254
255
  task_id_t task_id;
  thread_id_t thread_id;
  pthread_t thread = pthread_self ();

  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    thread_id = TASK_GET_THREAD_ID(task_id);

    if (itti_desc.threads[thread_id].task_thread == thread) {
      return task_id;
256
    }
257
  }
258

259
  return TASK_UNKNOWN;
260
261
}

262
263
264
#ifdef RTAI
static void rt_log_debug(char *format, ...)
{
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
  task_id_t   task_id;
  va_list     args;
  char        log_buffer[FIFO_PRINTF_MAX_STRING_SIZE];
  int         len;

  task_id = itti_get_current_task_id ();
  len = snprintf(log_buffer, FIFO_PRINTF_MAX_STRING_SIZE-1, "[ITTI][D][%s]", itti_get_task_name(task_id));
  va_start(args, format);
  len += vsnprintf(&log_buffer[len], FIFO_PRINTF_MAX_STRING_SIZE-1-len, format, args);
  va_end (args);

  if (task_id != TASK_UNKNOWN)
    fwrite(log_buffer, len, 1, stdout);
  else
    rtf_put (FIFO_PRINTF_NO, log_buffer, len);
280
281
282
}
#endif

283
284
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
285
286
  itti_desc.lte_time.frame = frame;
  itti_desc.lte_time.slot = slot;
287
288
}

289
290
291
292
293
294
295
296
297
298
int itti_send_broadcast_message(MessageDef *message_p)
{
  task_id_t destination_task_id;
  task_id_t origin_task_id;
  thread_id_t origin_thread_id;
  uint32_t thread_id;
  int ret = 0;
  int result;

  AssertFatal (message_p != NULL, "Trying to broadcast a NULL message!\n");
Cedric Roux's avatar
   
Cedric Roux committed
299

300
301
  origin_task_id = message_p->ittiMsgHeader.originTaskId;
  origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
Cedric Roux's avatar
   
Cedric Roux committed
302

303
  destination_task_id = TASK_FIRST;
Cedric Roux's avatar
   
Cedric Roux committed
304

305
306
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    MessageDef *new_message_p;
307

308
309
    while (thread_id != TASK_GET_THREAD_ID(destination_task_id)) {
      destination_task_id++;
Cedric Roux's avatar
   
Cedric Roux committed
310
311
    }

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
    /* Skip task that broadcast the message */
    if (thread_id != origin_thread_id) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        size_t size = sizeof(MessageHeader) + message_p->ittiMsgHeader.ittiMsgSize;
        new_message_p = itti_malloc( origin_task_id, destination_task_id, size );
        AssertFatal (new_message_p != NULL, "New message allocation failed!\n");

        memcpy( new_message_p, message_p, size );
        result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
        AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)!\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
      }
    }
  }

  result = itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
  AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

  return ret;
Cedric Roux's avatar
   
Cedric Roux committed
331
332
}

333
MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
Cedric Roux's avatar
Cedric Roux committed
334
{
335
  MessageDef *temp = NULL;
Cedric Roux's avatar
   
Cedric Roux committed
336

337
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
338

339
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
340
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
341
342
#endif

343
344
345
346
  if (origin_task_id == TASK_UNKNOWN) {
    /* Try to identify real origin task ID */
    origin_task_id = itti_get_current_task_id();
  }
347

348
  temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
Cedric Roux's avatar
   
Cedric Roux committed
349

350
351
352
  temp->ittiMsgHeader.messageId = message_id;
  temp->ittiMsgHeader.originTaskId = origin_task_id;
  temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
   
Cedric Roux committed
353

354
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
355
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
356
357
#endif

358
  return temp;
Cedric Roux's avatar
   
Cedric Roux committed
359
360
}

361
MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
Cedric Roux's avatar
Cedric Roux committed
362
{
363
  return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
Cedric Roux's avatar
Cedric Roux committed
364
365
}

366
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
367
{
368
369
370
371
372
373
  thread_id_t destination_thread_id;
  task_id_t origin_task_id;
  message_list_t *new;
  uint32_t priority;
  message_number_t message_number;
  uint32_t message_id;
Cedric Roux's avatar
   
Cedric Roux committed
374

winckel's avatar
winckel committed
375
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
376
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
377
                                          __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
378
379
#endif

380
381
  AssertFatal (message != NULL, "Message is NULL!\n");
  AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
382

383
384
385
386
387
388
389
  destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
  message->ittiMsgHeader.destinationTaskId = destination_task_id;
  message->ittiMsgHeader.instance = instance;
  message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
  message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
  message_id = message->ittiMsgHeader.messageId;
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
   
Cedric Roux committed
390

391
  origin_task_id = ITTI_MSG_ORIGIN_ID(message);
392

393
  priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
   
Cedric Roux committed
394

395
396
  /* Increment the global message number */
  message_number = itti_increment_message_number ();
Cedric Roux's avatar
   
Cedric Roux committed
397

398
399
  itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
                           sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
400

401
  if (destination_task_id != TASK_UNKNOWN) {
402
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
403
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
winckel's avatar
winckel committed
404

405
    memory_pools_set_info (itti_desc.memory_pools_handle, message, 1, destination_task_id);
winckel's avatar
winckel committed
406
407
#endif

408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
    if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) {
      ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    } else {
      /* We cannot send a message if the task is not running */
      AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                   "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                   itti_get_task_name(origin_task_id),
                   itti_desc.messages_info[message_id].name,
                   message_id,
                   destination_thread_id,
                   itti_desc.threads[destination_thread_id].task_state);

      /* Allocate new list element */
      new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));

      /* Fill in members */
      new->msg = message;
      new->message_number = message_number;
      new->message_priority = priority;

      /* Enqueue message in destination task queue */
435
436
437
      if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) {
        AssertFatal(0, "Error: lfds611_queue_enqueue returns 0, queue is full, exiting\n");
      }
438

439
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
440
      VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
441
442
#endif

443
#ifdef RTAI
444
445
446
447
448

      if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time) {
        /* This is a RT task, increase destination task messages pending counter */
        __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
      } else
449
#endif
450
451
452
453
454
455
456
457
458
459
      {
        /* Only use event fd for tasks, subtasks will pool the queue */
        if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) {
          ssize_t write_ret;
          eventfd_t sem_counter = 1;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                       destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
460
        }
461
462
463
464
465
466
467
468
469
      }

      ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
470
    }
471
472
473
474
475
  } else {
    /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
    int result = itti_free(origin_task_id, message);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
  }
476

477
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
478
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
479
                                          __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
480
481
#endif

482
  return 0;
Cedric Roux's avatar
   
Cedric Roux committed
483
484
}

485
486
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
487
488
  thread_id_t thread_id;
  struct epoll_event event;
489

490
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
491

492
493
  thread_id = TASK_GET_THREAD_ID(task_id);
  itti_desc.threads[thread_id].nb_events++;
494

495
496
497
498
  /* Reallocate the events */
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
499

500
501
502
  event.events  = EPOLLIN | EPOLLERR;
  event.data.u64 = 0;
  event.data.fd  = fd;
503

504
505
506
507
508
509
510
  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
                &event) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
511

512
  ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
513
514
515
516
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
517
  thread_id_t thread_id;
518

519
520
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (fd >= 0, "File descriptor (%d) is invalid!\n", fd);
521

522
523
524
525
526
527
528
529
  thread_id = TASK_GET_THREAD_ID(task_id);

  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
530

531
532
533
534
  itti_desc.threads[thread_id].nb_events--;
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
535
536
537
538
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
539
  thread_id_t thread_id;
540

541
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
542

543
544
  thread_id = TASK_GET_THREAD_ID(task_id);
  *events = itti_desc.threads[thread_id].events;
545

546
  return itti_desc.threads[thread_id].epoll_nb_events;
547
548
}

549
550
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
551
552
553
554
  thread_id_t thread_id;
  int epoll_ret = 0;
  int epoll_timeout = 0;
  int i;
555

556
557
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (received_msg != NULL, "Received message is NULL!\n");
558

559
560
  thread_id = TASK_GET_THREAD_ID(task_id);
  *received_msg = NULL;
561

562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
  if (polling) {
    /* In polling mode we set the timeout to 0 causing epoll_wait to return
     * immediately.
     */
    epoll_timeout = 0;
  } else {
    /* timeout = -1 causes the epoll_wait to wait indefinitely.
     */
    epoll_timeout = -1;
  }

  do {
    epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                           itti_desc.threads[thread_id].events,
                           itti_desc.threads[thread_id].nb_events,
                           epoll_timeout);
  } while (epoll_ret < 0 && errno == EINTR);

  if (epoll_ret < 0) {
    AssertFatal (0, "epoll_wait failed for task %s: %s!\n", itti_get_task_name(task_id), strerror(errno));
  }

  if (epoll_ret == 0 && polling) {
    /* No data to read -> return */
    return;
  }

  itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;

  for (i = 0; i < epoll_ret; i++) {
    /* Check if there is an event for ITTI for the event fd */
    if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
        (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd)) {
      struct message_list_s *message = NULL;
      eventfd_t   sem_counter;
      ssize_t     read_ret;
      int         result;

      /* Read will always return 1 */
      read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
      AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)!\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));


      if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
        /* No element in list -> this should not happen */
        AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue!\n", task_id, epoll_ret);
      }

      AssertFatal(message != NULL, "Message from message queue is NULL!\n");
      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

      /* Mark that the event has been processed */
      itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
      return;
618
    }
619
  }
620
621
622
623
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
624
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
625
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
626
                                          __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
627
#endif
628

629
  itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
Cedric Roux's avatar
   
Cedric Roux committed
630

winckel's avatar
winckel committed
631
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
632
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
633
                                          __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
634
#endif
Cedric Roux's avatar
   
Cedric Roux committed
635
636
}

637
638
639
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg)
{
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
Cedric Roux's avatar
   
Cedric Roux committed
640

641
  *received_msg = NULL;
Cedric Roux's avatar
   
Cedric Roux committed
642

643
#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
644
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
645
                                          __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
646
647
#endif

648
649
  {
    struct message_list_s *message;
650

651
652
    if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1) {
      int result;
653

654
655
656
      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
657
    }
658
  }
Cedric Roux's avatar
   
Cedric Roux committed
659

660
661
662
  if (*received_msg == NULL) {
    ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
  }
663
664

#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
665
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
666
                                          __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
667
#endif
Cedric Roux's avatar
   
Cedric Roux committed
668
669
}

670
671
672
673
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
  int result;
Cedric Roux's avatar
   
Cedric Roux committed
674

675
676
677
678
  AssertFatal (start_routine != NULL, "Start routine is NULL!\n");
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
  AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)!\n",
               task_id, thread_id, itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
   
Cedric Roux committed
679

680
  itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
   
Cedric Roux committed
681

682
  ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
683

684
685
686
687
688
  result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
  AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)!\n", task_id, thread_id, result);
  char name[16];
  snprintf( name, sizeof(name), "ITTI %d", thread_id );
  pthread_setname_np( itti_desc.threads[thread_id].task_thread, name );
Cedric Roux's avatar
   
Cedric Roux committed
689

690
  itti_desc.created_tasks ++;
691

692
693
694
  /* Wait till the thread is completely ready */
  while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
    usleep (1000);
695

696
  return 0;
Cedric Roux's avatar
   
Cedric Roux committed
697
698
}

699
//#ifdef RTAI
700
701
void itti_set_task_real_time(task_id_t task_id)
{
702
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
703

704
  DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
705

706
  itti_desc.threads[thread_id].real_time = TRUE;
707
}
knopp's avatar
   
knopp committed
708
//#endif
709

710
711
void itti_wait_ready(int wait_tasks)
{
712
  itti_desc.wait_tasks = wait_tasks;
713

714
715
716
717
718
  ITTI_DEBUG(ITTI_DEBUG_INIT,
             " wait for tasks: %s, created tasks %d, ready tasks %d\n",
             itti_desc.wait_tasks ? "yes" : "no",
             itti_desc.created_tasks,
             itti_desc.ready_tasks);
719

720
721
  AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d!\n",
               itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
722
723
}

724
725
void itti_mark_task_ready(task_id_t task_id)
{
726
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
Cedric Roux's avatar
   
Cedric Roux committed
727

728
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
729

730
731
  /* Register the thread in itti dump */
  itti_dump_thread_use_ring_buffer();
732

733
734
  /* Mark the thread as using LFDS queue */
  lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
735

736
#ifdef RTAI
737
738
739
740
741
742
  /* Assign low priority to created threads */
  {
    struct sched_param sched_param;
    sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
    sched_setscheduler(0, SCHED_FIFO, &sched_param);
  }
743
744
#endif

745
746
  itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
  itti_desc.ready_tasks ++;
747

748
749
750
  while (itti_desc.wait_tasks != 0) {
    usleep (10000);
  }
751

752
  ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
753
754
}

755
756
void itti_exit_task(void)
{
757
#if defined(OAI_EMU) || defined(RTAI)
758
759
760
  task_id_t task_id = itti_get_current_task_id();

  if (task_id > TASK_UNKNOWN) {
gauthier's avatar
gauthier committed
761
    VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
762
763
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
  }
764
765

#endif
766
  pthread_exit (NULL);
767
768
}

769
770
771
772
void itti_terminate_tasks(task_id_t task_id)
{
  // Sends Terminate signals to all tasks.
  itti_send_terminate_message (task_id);
773

774
775
776
  if (itti_desc.thread_handling_signals) {
    pthread_kill (itti_desc.thread_ref, SIGUSR1);
  }
777

778
  pthread_exit (NULL);
Cedric Roux's avatar
   
Cedric Roux committed
779
780
}

781
782
783
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
784
785
  thread_id_t thread_id;
  unsigned pending_messages;
786

787
788
  while (itti_desc.running) {
    usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks
789
790

#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
791
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
792
#endif
793

794
795
796
797
798
799
800
801
802
803
804
805
806
    /* Checks for all non real time tasks if they have pending messages */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
          && (itti_desc.threads[thread_id].real_time == FALSE)) {
        pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

        if (pending_messages > 0) {
          ssize_t write_ret;
          eventfd_t sem_counter = pending_messages;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
807
        }
808
809
      }
    }
810
811

#if defined(OAI_EMU) || defined(RTAI)
gauthier's avatar
gauthier committed
812
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
813
#endif
814
815
816
  }

  return NULL;
817
818
819
}
#endif

820
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name)
{
  task_id_t task_id;
  thread_id_t thread_id;
  int ret;

  itti_desc.message_number = 1;

  ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);

  CHECK_INIT_RETURN(signal_mask());

  /* Saves threads and messages max values */
  itti_desc.task_max = task_max;
  itti_desc.thread_max = thread_max;
  itti_desc.messages_id_max = messages_id_max;
  itti_desc.thread_handling_signals = FALSE;
  itti_desc.tasks_info = tasks_info;
  itti_desc.messages_info = messages_info;

  /* Allocates memory for tasks info */
  itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

  /* Allocates memory for threads info */
  itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));

  /* Initializing each queue and related stuff */
  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
               itti_desc.tasks_info[task_id].name,
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
               itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);

    ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);

860
    if (0 == ret) {
861
      AssertFatal (0, "lfds611_queue_new failed for task %s!\n", itti_get_task_name(task_id));
862
    }
863
  }
864

865
866
867
  /* Initializing each thread */
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
868

869
    itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
870

871
872
873
874
    if (itti_desc.threads[thread_id].epoll_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, "Failed to create new epoll fd: %s!\n", strerror(errno));
    }
875

876
    itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
877

878
879
880
881
    if (itti_desc.threads[thread_id].task_event_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, " eventfd failed: %s!\n", strerror(errno));
    }
882

883
    itti_desc.threads[thread_id].nb_events = 1;
884

885
    itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
886

887
888
889
890
891
892
893
894
895
896
897
898
    itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
    itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;

    /* Add the event fd to the list of monitored events */
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
                  itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) {
      /* Always assert on this condition */
      AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s!\n", strerror(errno));
    }

    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
               itti_desc.threads[thread_id].task_event_fd, thread_id);
899

900
#ifdef RTAI
901
902
    itti_desc.threads[thread_id].real_time = FALSE;
    itti_desc.threads[thread_id].messages_pending = 0;
903
#endif
904
  }
905

906
907
908
909
  itti_desc.running = 1;
  itti_desc.wait_tasks = 0;
  itti_desc.created_tasks = 0;
  itti_desc.ready_tasks = 0;
910
#ifdef RTAI
911
912
  /* Start RT relay thread */
  DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
913

914
  rt_global_heap_open();
915
#endif
916

917
#if defined(OAI_EMU) || defined(RTAI)
918
919
920
921
922
923
924
925
926
927
928
929
930
  itti_desc.memory_pools_handle = memory_pools_create (5);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS,       50);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 10000,                                1000);
  memory_pools_add_pool (itti_desc.memory_pools_handle,  400,                                 20050);
  memory_pools_add_pool (itti_desc.memory_pools_handle,  100,                                 30050);

  {
    char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

    ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
    free (statistics);
  }
931
932
#endif

933
#if defined(OAI_EMU) || defined(RTAI)
934
935
936
  itti_desc.vcd_poll_msg = 0;
  itti_desc.vcd_receive_msg = 0;
  itti_desc.vcd_send_msg = 0;
937
938
#endif

939
  itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
   
Cedric Roux committed
940

941
  CHECK_INIT_RETURN(timer_init ());
Cedric Roux's avatar
   
Cedric Roux committed
942

943
  return 0;
Cedric Roux's avatar
   
Cedric Roux committed
944
945
}

946
947
948
949
950
951
952
953
void itti_wait_tasks_end(void)
{
  int end = 0;
  int thread_id;
  task_id_t task_id;
  int ready_tasks;
  int result;
  int retries = 10;
954

955
956
  itti_desc.thread_handling_signals = TRUE;
  itti_desc.thread_ref=pthread_self ();
957

958
959
960
961
962
963
  /* Handle signals here */
  while (end == 0) {
    signal_handle (&end);
  }

  printf("closing all tasks\n");
964
  sleep(1);
965

966
967
968
969
970
971
972
973
974
975
  do {
    ready_tasks = 0;

    task_id = TASK_FIRST;

    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        while (thread_id != TASK_GET_THREAD_ID(task_id)) {
          task_id++;
976
        }
977
978
979
980
981
982
983
984
985
986
987

        result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);

        ITTI_DEBUG(ITTI_DEBUG_EXIT, " Thread %s join status %d\n", itti_get_task_name(task_id), result);

        if (result == 0) {
          /* Thread has terminated */
          itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
        } else {
          /* Thread is still running, count it */
          ready_tasks++;
988
        }
989
990
      }
    }
991

992
993
994
995
    if (ready_tasks > 0) {
      usleep (100 * 1000);
    }
  } while ((ready_tasks > 0) && (retries--)&& (!end) );
996