intertask_interface.c 27.7 KB
Newer Older
1 2 3 4 5
/*
 * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The OpenAirInterface Software Alliance licenses this file to You under
Cedric Roux's avatar
Cedric Roux committed
6
 * the OAI Public License, Version 1.1  (the "License"); you may not use this file
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.openairinterface.org/?page_id=698
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *-------------------------------------------------------------------------------
 * For more information about the OpenAirInterface (OAI) Software Alliance:
 *      contact@openairinterface.org
 */

22
#define _GNU_SOURCE
23 24 25 26 27 28
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
29
#include <signal.h>
30

31 32
#include <sys/epoll.h>
#include <sys/eventfd.h>
33

gauthier's avatar
gauthier committed
34 35 36 37
#if !defined(TRUE)
#define TRUE 1
#endif

38 39 40 41 42 43
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

44 45 46 47
#if T_TRACER
#include "T.h"
#endif

48 49 50
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
51 52 53 54
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

55
#include "signals.h"
56 57
#include "timer.h"

58 59 60 61 62 63 64 65 66
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

67
const int itti_debug = (ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS);
68

69
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) {fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout);} } while(0);
70
#define ITTI_ERROR(x, args...)      do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } while(0);
71 72 73 74 75

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
76
  TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
77 78 79
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
80
typedef struct message_list_s {
81
  MessageDef *msg; ///< Pointer to the message
82

83 84
  message_number_t message_number; ///< Unique message number
  uint32_t message_priority; ///< Message priority
85
} message_list_t;
86

87
typedef struct thread_desc_s {
88 89
  /* pthread associated with the thread */
  pthread_t task_thread;
90

91 92
  /* State of the thread */
  volatile task_state_t task_state;
93

94 95
  /* This fd is used internally by ITTI. */
  int epoll_fd;
96

97 98
  /* The thread fd */
  int task_event_fd;
99

100 101
  /* Number of events to monitor */
  uint16_t nb_events;
102

103

104 105 106 107 108 109
  /* Array of events monitored by the task.
   * By default only one fd is monitored (the one used to received messages
   * from other tasks).
   * More events can be suscribed later by the task itself.
   */
  struct epoll_event *events;
110

111
  int epoll_nb_events;
112

113 114
  /* Flag to mark real time thread */
  unsigned real_time;
115

116
  /* Counter to indicate that messages are pending for the thread */
117
  unsigned messages_pending;
118 119 120
} thread_desc_t;

typedef struct task_desc_s {
121 122
  /* Queue of messages belonging to the task */
  struct lfds611_queue_state *message_queue;
123 124
} task_desc_t;

125
typedef struct itti_desc_s {
126 127
  thread_desc_t *threads;
  task_desc_t   *tasks;
128

129 130
  /* Current message number. Incremented every call to send_msg_to_task */
  message_number_t message_number __attribute__((aligned(8)));
131

132 133 134
  thread_id_t thread_max;
  task_id_t task_max;
  MessagesIds messages_id_max;
135

136 137
  boolean_t thread_handling_signals;
  pthread_t thread_ref;
138

139 140
  const task_info_t *tasks_info;
  const message_info_t *messages_info;
141

142
  itti_lte_time_t lte_time;
143

144
  int running;
145

146 147 148
  volatile uint32_t created_tasks;
  volatile uint32_t ready_tasks;
  volatile int      wait_tasks;
149 150 151
} itti_desc_t;

static itti_desc_t itti_desc;
152

153
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
154
{
155
  void *ptr = NULL;
156

157
  ptr = malloc (size);
158
  if (ptr) memset(ptr,0,size);
winckel's avatar
winckel committed
159

160
  AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)!\n", (int) size, origin_task_id, destination_task_id);
161

162
  return ptr;
163 164
}

165
int itti_free(task_id_t task_id, void *ptr)
166
{
167 168
  int result = EXIT_SUCCESS;
  AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
169

170
  free (ptr);
171

172
  return (result);
173 174
}

175 176 177 178 179 180 181
static inline message_number_t itti_increment_message_number(void)
{
  /* Atomic operation supported by GCC: returns the current message number
   * and then increment it by 1.
   * This can be done without mutex.
   */
  return __sync_fetch_and_add (&itti_desc.message_number, 1);
182 183
}

184 185 186
static inline uint32_t itti_get_message_priority(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
187

188
  return (itti_desc.messages_info[message_id].priority);
189 190
}

191 192 193
const char *itti_get_message_name(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
194

195
  return (itti_desc.messages_info[message_id].name);
196 197
}

198
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
199
{
200 201 202 203 204
  if (itti_desc.task_max > 0) {
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  } else {
    return ("ITTI NOT INITIALIZED !!!");
  }
Cedric Roux's avatar
Cedric Roux committed
205

206
  return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
207 208
}

209
static task_id_t itti_get_current_task_id(void)
210
{
211 212 213 214 215 216 217 218 219
  task_id_t task_id;
  thread_id_t thread_id;
  pthread_t thread = pthread_self ();

  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    thread_id = TASK_GET_THREAD_ID(task_id);

    if (itti_desc.threads[thread_id].task_thread == thread) {
      return task_id;
220
    }
221
  }
222

223
  return TASK_UNKNOWN;
224 225
}

226 227
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
228 229
  itti_desc.lte_time.frame = frame;
  itti_desc.lte_time.slot = slot;
230 231
}

232 233 234 235 236 237 238 239 240 241
int itti_send_broadcast_message(MessageDef *message_p)
{
  task_id_t destination_task_id;
  task_id_t origin_task_id;
  thread_id_t origin_thread_id;
  uint32_t thread_id;
  int ret = 0;
  int result;

  AssertFatal (message_p != NULL, "Trying to broadcast a NULL message!\n");
242

243 244
  origin_task_id = message_p->ittiMsgHeader.originTaskId;
  origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
245

246
  destination_task_id = TASK_FIRST;
247

248 249
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    MessageDef *new_message_p;
250

251 252
    while (thread_id != TASK_GET_THREAD_ID(destination_task_id)) {
      destination_task_id++;
253 254
    }

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    /* Skip task that broadcast the message */
    if (thread_id != origin_thread_id) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        size_t size = sizeof(MessageHeader) + message_p->ittiMsgHeader.ittiMsgSize;
        new_message_p = itti_malloc( origin_task_id, destination_task_id, size );
        AssertFatal (new_message_p != NULL, "New message allocation failed!\n");

        memcpy( new_message_p, message_p, size );
        result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
        AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)!\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
      }
    }
  }

  result = itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
  AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

  return ret;
274 275
}

276
MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
Cedric Roux's avatar
Cedric Roux committed
277
{
278
  MessageDef *temp = NULL;
279

280
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
281

282 283 284 285
  if (origin_task_id == TASK_UNKNOWN) {
    /* Try to identify real origin task ID */
    origin_task_id = itti_get_current_task_id();
  }
286

287
  temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
288

289 290 291
  temp->ittiMsgHeader.messageId = message_id;
  temp->ittiMsgHeader.originTaskId = origin_task_id;
  temp->ittiMsgHeader.ittiMsgSize = size;
292

293
  return temp;
294 295
}

296
MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
Cedric Roux's avatar
Cedric Roux committed
297
{
298
  return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
Cedric Roux's avatar
Cedric Roux committed
299 300
}

301
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
302
{
303 304 305 306 307 308
  thread_id_t destination_thread_id;
  task_id_t origin_task_id;
  message_list_t *new;
  uint32_t priority;
  message_number_t message_number;
  uint32_t message_id;
309

310 311
  AssertFatal (message != NULL, "Message is NULL!\n");
  AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);
312

313 314 315 316 317 318 319
  destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
  message->ittiMsgHeader.destinationTaskId = destination_task_id;
  message->ittiMsgHeader.instance = instance;
  message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
  message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
  message_id = message->ittiMsgHeader.messageId;
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
320

321
  origin_task_id = ITTI_MSG_ORIGIN_ID(message);
322

323
  priority = itti_get_message_priority (message_id);
324

325 326
  /* Increment the global message number */
  message_number = itti_increment_message_number ();
327

328 329
  itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
                           sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
330

331
  if (destination_task_id != TASK_UNKNOWN) {
winckel's avatar
winckel committed
332

333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) {
      ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    } else {
      /* We cannot send a message if the task is not running */
      AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                   "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                   itti_get_task_name(origin_task_id),
                   itti_desc.messages_info[message_id].name,
                   message_id,
                   destination_thread_id,
                   itti_desc.threads[destination_thread_id].task_state);

      /* Allocate new list element */
      new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));

      /* Fill in members */
      new->msg = message;
      new->message_number = message_number;
      new->message_priority = priority;

      /* Enqueue message in destination task queue */
360 361 362
      if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) {
        AssertFatal(0, "Error: lfds611_queue_enqueue returns 0, queue is full, exiting\n");
      }
363

364 365 366 367 368 369 370 371 372 373
      {
        /* Only use event fd for tasks, subtasks will pool the queue */
        if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) {
          ssize_t write_ret;
          eventfd_t sem_counter = 1;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                       destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
374
        }
375 376 377 378 379 380 381 382 383
      }

      ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
384
    }
385 386 387 388 389
  } else {
    /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
    int result = itti_free(origin_task_id, message);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
  }
390

391
  return 0;
392 393
}

394 395
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
396 397
  thread_id_t thread_id;
  struct epoll_event event;
398

399
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
400

401 402
  thread_id = TASK_GET_THREAD_ID(task_id);
  itti_desc.threads[thread_id].nb_events++;
403

404 405 406 407
  /* Reallocate the events */
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
408

409 410 411
  event.events  = EPOLLIN | EPOLLERR;
  event.data.u64 = 0;
  event.data.fd  = fd;
412

413 414 415 416 417 418 419
  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
                &event) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
420

421
  ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
422 423 424 425
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
426
  thread_id_t thread_id;
427

428 429
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (fd >= 0, "File descriptor (%d) is invalid!\n", fd);
430

431 432 433 434 435 436 437 438
  thread_id = TASK_GET_THREAD_ID(task_id);

  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
439

440 441 442 443
  itti_desc.threads[thread_id].nb_events--;
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
444 445 446 447
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
448
  thread_id_t thread_id;
449

450
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
451

452 453
  thread_id = TASK_GET_THREAD_ID(task_id);
  *events = itti_desc.threads[thread_id].events;
454

455
  return itti_desc.threads[thread_id].epoll_nb_events;
456 457
}

458 459
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
460 461 462 463
  thread_id_t thread_id;
  int epoll_ret = 0;
  int epoll_timeout = 0;
  int i;
464

465 466
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (received_msg != NULL, "Received message is NULL!\n");
467

468 469
  thread_id = TASK_GET_THREAD_ID(task_id);
  *received_msg = NULL;
470

471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
  if (polling) {
    /* In polling mode we set the timeout to 0 causing epoll_wait to return
     * immediately.
     */
    epoll_timeout = 0;
  } else {
    /* timeout = -1 causes the epoll_wait to wait indefinitely.
     */
    epoll_timeout = -1;
  }

  do {
    epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                           itti_desc.threads[thread_id].events,
                           itti_desc.threads[thread_id].nb_events,
                           epoll_timeout);
  } while (epoll_ret < 0 && errno == EINTR);

  if (epoll_ret < 0) {
    AssertFatal (0, "epoll_wait failed for task %s: %s!\n", itti_get_task_name(task_id), strerror(errno));
  }

  if (epoll_ret == 0 && polling) {
    /* No data to read -> return */
    return;
  }

  itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;

  for (i = 0; i < epoll_ret; i++) {
    /* Check if there is an event for ITTI for the event fd */
    if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
        (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd)) {
      struct message_list_s *message = NULL;
      eventfd_t   sem_counter;
      ssize_t     read_ret;
      int         result;

      /* Read will always return 1 */
      read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
      AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)!\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));

      if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
        /* No element in list -> this should not happen */
        AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue!\n", task_id, epoll_ret);
      }

      AssertFatal(message != NULL, "Message from message queue is NULL!\n");
      *received_msg = message->msg;
520 521


522 523 524
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

525

526 527 528
      /* Mark that the event has been processed */
      itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
      return;
529
    }
530
  }
531 532 533 534
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
535

536
  itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
537 538 539

}

540 541 542
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg)
{
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
543

544
  *received_msg = NULL;
545

546 547
  {
    struct message_list_s *message;
548

549 550
    if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1) {
      int result;
551

552 553 554
      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
555
    }
556
  }
557

558 559 560
  if (*received_msg == NULL) {
    ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
  }
561

562 563
}

564 565 566 567
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
  int result;
568

569 570 571 572
  AssertFatal (start_routine != NULL, "Start routine is NULL!\n");
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
  AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)!\n",
               task_id, thread_id, itti_desc.threads[thread_id].task_state);
573

574
  itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
575

576
  ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
577

578 579 580 581 582
  result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
  AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)!\n", task_id, thread_id, result);
  char name[16];
  snprintf( name, sizeof(name), "ITTI %d", thread_id );
  pthread_setname_np( itti_desc.threads[thread_id].task_thread, name );
583

584
  itti_desc.created_tasks ++;
585

586 587 588
  /* Wait till the thread is completely ready */
  while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
    usleep (1000);
589

590
  return 0;
591 592
}

593 594
void itti_set_task_real_time(task_id_t task_id)
{
595
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
596

597
  DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
598

599
  itti_desc.threads[thread_id].real_time = TRUE;
600 601
}

602 603
void itti_wait_ready(int wait_tasks)
{
604
  itti_desc.wait_tasks = wait_tasks;
605

606 607 608 609 610
  ITTI_DEBUG(ITTI_DEBUG_INIT,
             " wait for tasks: %s, created tasks %d, ready tasks %d\n",
             itti_desc.wait_tasks ? "yes" : "no",
             itti_desc.created_tasks,
             itti_desc.ready_tasks);
611

612 613
  AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d!\n",
               itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
614 615
}

616 617
void itti_mark_task_ready(task_id_t task_id)
{
618
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
619

620
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
621

622 623
  /* Register the thread in itti dump */
  itti_dump_thread_use_ring_buffer();
624

625 626
  /* Mark the thread as using LFDS queue */
  lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
627

628 629
  itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
  itti_desc.ready_tasks ++;
630

631 632 633
  while (itti_desc.wait_tasks != 0) {
    usleep (10000);
  }
634

635
  ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
636 637
}

638 639 640
void itti_exit_task(void)
{
  task_id_t task_id = itti_get_current_task_id();
641
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
642

643
#if defined(OAI_EMU) || defined(RTAI)
644
  if (task_id > TASK_UNKNOWN) {
gauthier's avatar
gauthier committed
645
    VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
646 647
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
  }
648
#endif
649 650 651 652

  itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
  itti_desc.created_tasks--;
  ITTI_DEBUG(ITTI_DEBUG_EXIT, "Thread for task %s (%d) exits\n", itti_get_task_name(task_id), task_id);
653
  pthread_exit (NULL);
654 655
}

656 657 658 659
void itti_terminate_tasks(task_id_t task_id)
{
  // Sends Terminate signals to all tasks.
  itti_send_terminate_message (task_id);
660

661 662 663
  if (itti_desc.thread_handling_signals) {
    pthread_kill (itti_desc.thread_ref, SIGUSR1);
  }
664

665
  pthread_exit (NULL);
666 667
}

668
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name)
{
  task_id_t task_id;
  thread_id_t thread_id;
  int ret;

  itti_desc.message_number = 1;

  ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);

  CHECK_INIT_RETURN(signal_mask());

  /* Saves threads and messages max values */
  itti_desc.task_max = task_max;
  itti_desc.thread_max = thread_max;
  itti_desc.messages_id_max = messages_id_max;
  itti_desc.thread_handling_signals = FALSE;
  itti_desc.tasks_info = tasks_info;
  itti_desc.messages_info = messages_info;

  /* Allocates memory for tasks info */
  itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

  /* Allocates memory for threads info */
  itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));

  /* Initializing each queue and related stuff */
  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
               itti_desc.tasks_info[task_id].name,
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
               itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);

    ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);

708
    if (0 == ret) {
709
      AssertFatal (0, "lfds611_queue_new failed for task %s!\n", itti_get_task_name(task_id));
710
    }
711
  }
712

713 714 715
  /* Initializing each thread */
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
716

717
    itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
718

719 720 721 722
    if (itti_desc.threads[thread_id].epoll_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, "Failed to create new epoll fd: %s!\n", strerror(errno));
    }
723

724
    itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
725

726 727 728 729
    if (itti_desc.threads[thread_id].task_event_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, " eventfd failed: %s!\n", strerror(errno));
    }
730

731
    itti_desc.threads[thread_id].nb_events = 1;
732

733
    itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
734

735 736 737 738 739 740 741 742 743 744 745 746
    itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
    itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;

    /* Add the event fd to the list of monitored events */
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
                  itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) {
      /* Always assert on this condition */
      AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s!\n", strerror(errno));
    }

    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
               itti_desc.threads[thread_id].task_event_fd, thread_id);
747

748
  }
749

750 751 752 753 754
  itti_desc.running = 1;
  itti_desc.wait_tasks = 0;
  itti_desc.created_tasks = 0;
  itti_desc.ready_tasks = 0;
  itti_dump_init (messages_definition_xml, dump_file_name);
755

756
  CHECK_INIT_RETURN(timer_init ());
757

758
  return 0;
759 760
}

761 762 763 764 765 766 767 768
void itti_wait_tasks_end(void)
{
  int end = 0;
  int thread_id;
  task_id_t task_id;
  int ready_tasks;
  int result;
  int retries = 10;
769

770 771
  itti_desc.thread_handling_signals = TRUE;
  itti_desc.thread_ref=pthread_self ();
772

773 774 775 776 777 778
  /* Handle signals here */
  while (end == 0) {
    signal_handle (&end);
  }

  printf("closing all tasks\n");
779
  sleep(1);
780

781 782 783 784 785 786 787 788 789 790
  do {
    ready_tasks = 0;

    task_id = TASK_FIRST;

    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        while (thread_id != TASK_GET_THREAD_ID(task_id)) {
          task_id++;
791
        }
792 793 794 795 796 797 798 799 800 801 802

        result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);

        ITTI_DEBUG(ITTI_DEBUG_EXIT, " Thread %s join status %d\n", itti_get_task_name(task_id), result);

        if (result == 0) {
          /* Thread has terminated */
          itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
        } else {
          /* Thread is still running, count it */
          ready_tasks++;
803
        }
804 805
      }
    }
806

807 808 809 810
    if (ready_tasks > 0) {
      usleep (100 * 1000);
    }
  } while ((ready_tasks > 0) && (retries--)&& (!end) );
811

812 813 814
  printf("ready_tasks %d\n",ready_tasks);

  itti_desc.running = 0;
815

816 817 818 819
  if (ready_tasks > 0) {
    ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Some threads are still running, force exit\n");
    exit (0);
  }
820

821
  itti_dump_exit();
822 823
}

824 825 826
void itti_send_terminate_message(task_id_t task_id)
{
  MessageDef *terminate_message_p;
827

828
  terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
829

830
  itti_send_broadcast_message (terminate_message_p);
831
}