Skip to content
Snippets Groups Projects
intertask_interface.cpp 18.8 KiB
Newer Older
Thomas Laurent's avatar
Thomas Laurent committed
/*
Thomas Laurent's avatar
Thomas Laurent committed
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1  (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.openairinterface.org/?page_id=698
*
* Author and copyright: Laurent Thomas, open-cells.com
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
*      contact@openairinterface.org
Thomas Laurent's avatar
Thomas Laurent committed
*/
Thomas Laurent's avatar
Thomas Laurent committed
#include <vector>
#include <map>
#include <sys/eventfd.h>
Thomas Laurent's avatar
Thomas Laurent committed
extern "C" {
Thomas Laurent's avatar
Thomas Laurent committed
#include <intertask_interface.h>
Thomas Laurent's avatar
Thomas Laurent committed
#include <common/utils/system.h>
#include "executables/softmodem-common.h"
typedef struct timer_elm_s {
  timer_type_t type; ///< Timer type
  long instance;
  long duration;
  uint64_t timeout;
  void *timer_arg; ///< Optional argument that will be passed when timer expires
} timer_elm_t;

  typedef struct task_list_s {
    task_info_t admin;
    ittiTask_parms_t task_parms;
    pthread_t thread;
    pthread_mutex_t queue_cond_lock;
    std::vector<MessageDef *> message_queue;
    std::map<long,timer_elm_t> timer_map;
    uint64_t next_timer=UINT64_MAX;
    int nb_fd_epoll=0;
    int epoll_fd=-1;
    int sem_fd=-1;
    size_t last_log_size = 0;
  } task_list_t;

  int timer_expired(int fd);
  static task_list_t **tasks=NULL;
  static int nb_queues=0;
  static pthread_mutex_t lock_nb_queues;
Thomas Laurent's avatar
Thomas Laurent committed

  void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size) {
    void *ptr = NULL;
Thomas Laurent's avatar
Thomas Laurent committed
    AssertFatal ((ptr=calloc (size, 1)) != NULL, "Memory allocation of %zu bytes failed (%d -> %d)!\n",
Thomas Laurent's avatar
Thomas Laurent committed
                 size, origin_task_id, destination_task_id);
    return ptr;
  }

  int itti_free(task_id_t task_id, void *ptr) {
    AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
    free (ptr);
    return (EXIT_SUCCESS);
  }

  // in the two following functions, the +32 in malloc is there to deal with gcc memory alignment
  // because a struct size can be larger than sum(sizeof(struct components))
  // We should remove the itti principle of a huge union for all types of messages in paramter "msg_t ittiMsg"
  // to use a more C classical pointer casting "void * ittiMsg", later casted in the right struct
  // but we would have to change all legacy macros, as per this example
  // #define S1AP_REGISTER_ENB_REQ(mSGpTR)           (mSGpTR)->ittiMsg.s1ap_register_enb_req
  // would become
  // #define S1AP_REGISTER_ENB_REQ(mSGpTR)           (s1ap_register_enb_req) mSGpTR)->ittiMsg
  MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, instance_t originInstance, MessagesIds message_id, MessageHeaderSize size) {
    MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) +32 + size);
Thomas Laurent's avatar
Thomas Laurent committed
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
    return temp;
  }

  MessageDef *itti_alloc_new_message(task_id_t origin_task_id, instance_t originInstance, MessagesIds message_id) {
    int size=sizeof(MessageHeader) + 32 + messages_info[message_id].size;
Thomas Laurent's avatar
Thomas Laurent committed
    MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, size);
    temp->ittiMsgHeader.messageId = message_id;
    temp->ittiMsgHeader.originTaskId = origin_task_id;
    temp->ittiMsgHeader.ittiMsgSize = size;
Thomas Laurent's avatar
Thomas Laurent committed
    temp->ittiMsgHeader.destinationTaskId=TASK_UNKNOWN;
    temp->ittiMsgHeader.originInstance=originInstance;
    temp->ittiMsgHeader.destinationInstance=0;
    temp->ittiMsgHeader.lte_time= {0};
Thomas Laurent's avatar
Thomas Laurent committed
    return temp;
    //return itti_alloc_new_message_sized(origin_task_id, message_id, messages_info[message_id].size);
  }

  static inline int itti_send_msg_to_task_locked(task_id_t destination_task_id, instance_t destinationInstance, MessageDef *message) {
    task_list_t *t=tasks[destination_task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    message->ittiMsgHeader.destinationTaskId = destination_task_id;
    message->ittiMsgHeader.destinationInstance = destinationInstance;
Thomas Laurent's avatar
Thomas Laurent committed
    message->ittiMsgHeader.lte_time.frame = 0;
    message->ittiMsgHeader.lte_time.slot = 0;
    int message_id = message->ittiMsgHeader.messageId;
    size_t s=t->message_queue.size();

    // to reduce the number of logs, we give a message each increase of 25%
    if ((s > t->last_log_size * 1.25) && (s > t->admin.queue_size / 10)) {
      if (s > t->admin.queue_size) {
        LOG_E(TMR, "Queue for %s task contains %ld messages\n", itti_get_task_name(destination_task_id), s);
      } else {
        LOG_I(ITTI,
              "Queue for %s task size: %ld (last message %s)\n",
              itti_get_task_name(destination_task_id),
              s + 1,
              ITTI_MSG_NAME(message));
      }
Thomas Laurent's avatar
Thomas Laurent committed
      t->last_log_size = s;
    } else if (t->last_log_size && s < t->admin.queue_size / 10) {
      // Inform when the queue decreases
      LOG_I(ITTI, "Queue for %s task size is back under 10%% of max size\n", itti_get_task_name(destination_task_id));
      t->last_log_size = 0;
    }
Thomas Laurent's avatar
Thomas Laurent committed

    t->message_queue.insert(t->message_queue.begin(), message);
    eventfd_t sem_counter = 1;
    AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
laurent's avatar
laurent committed
    LOG_D(ITTI, "sent messages id=%s messages_info to %s\n", messages_info[message_id].name, t->admin.name);
Thomas Laurent's avatar
Thomas Laurent committed
    return 0;
  }

  int itti_send_msg_to_task(task_id_t destination_task_id, instance_t destinationInstance, MessageDef *message) {
    task_list_t *t=tasks[destination_task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    pthread_mutex_lock (&t->queue_cond_lock);
    int ret=itti_send_msg_to_task_locked(destination_task_id, destinationInstance, message);
    while (t->message_queue.size() > 0 && t->task_parms.shortcut_func != NULL) {
Thomas Laurent's avatar
Thomas Laurent committed
      if (t->message_queue.size()>1)
        LOG_W(ITTI,"queue in no thread mode is %ld\n", t->message_queue.size());
Thomas Laurent's avatar
Thomas Laurent committed
      pthread_mutex_unlock (&t->queue_cond_lock);
      t->task_parms.shortcut_func(NULL);
      pthread_mutex_lock (&t->queue_cond_lock);
    pthread_mutex_unlock (&t->queue_cond_lock);
Thomas Laurent's avatar
Thomas Laurent committed
    return ret;
  }

  void itti_subscribe_event_fd(task_id_t task_id, int fd) {
    struct epoll_event event;
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    t->nb_fd_epoll++;
    event.events  = EPOLLIN | EPOLLERR;
    event.data.u64 = 0;
    event.data.fd  = fd;
    AssertFatal(epoll_ctl(t->epoll_fd, EPOLL_CTL_ADD, fd, &event) == 0,
                "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
                itti_get_task_name(task_id), fd, strerror(errno));
    eventfd_t sem_counter = 1;
    AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
Thomas Laurent's avatar
Thomas Laurent committed
  }

  void itti_unsubscribe_event_fd(task_id_t task_id, int fd) {
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    AssertFatal (epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL) == 0,
                 "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
    t->nb_fd_epoll--;
  }

  static inline int itti_get_events_locked(task_id_t task_id, struct epoll_event *events, int max_events) {
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    uint64_t current_time=0;
    int nb_events;
Thomas Laurent's avatar
Thomas Laurent committed
    do {
      if ( t->next_timer != UINT64_MAX ) {
        struct timespec tp;
        clock_gettime(CLOCK_MONOTONIC, &tp);
        current_time=(uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000);

        if ( t->next_timer < current_time) {
          t->next_timer=UINT64_MAX;

          // Proceed expired timer
          for ( auto it=t->timer_map.begin(), next_it = it; it != t->timer_map.end() ; it = next_it ) {
Thomas Laurent's avatar
Thomas Laurent committed
            ++next_it;
Thomas Laurent's avatar
Thomas Laurent committed
            if ( it->second.timeout < current_time ) {
              MessageDef *message = itti_alloc_new_message(TASK_TIMER, it->second.instance,TIMER_HAS_EXPIRED);
Thomas Laurent's avatar
Thomas Laurent committed
              message->ittiMsg.timer_has_expired.timer_id=it->first;
              message->ittiMsg.timer_has_expired.arg=it->second.timer_arg;

              if (itti_send_msg_to_task_locked(task_id, it->second.instance, message) < 0) {
                LOG_W(ITTI,"Failed to send msg TIMER_HAS_EXPIRED to task %u\n", task_id);
Thomas Laurent's avatar
Thomas Laurent committed
                free(message);
                t->timer_map.erase(it);
                return -1;
              }

              if ( it->second.type==TIMER_PERIODIC ) {
                it->second.timeout+=it->second.duration;

                if (it->second.timeout < t->next_timer)
                  t->next_timer=it->second.timeout;
              } else
                t->timer_map.erase(it);
            } else if (it->second.timeout < t->next_timer)
              t->next_timer=it->second.timeout;
          }
        }
      }

      int epoll_timeout = -1;

      if ( t->next_timer != UINT64_MAX )
        epoll_timeout = t->next_timer-current_time;

      pthread_mutex_unlock(&t->queue_cond_lock);
      LOG_D(ITTI,"enter blocking wait for %s, timeout: %d ms\n", itti_get_task_name(task_id),  epoll_timeout);
      nb_events = epoll_wait(t->epoll_fd, events, max_events, epoll_timeout);
      if ( nb_events  < 0 && (errno == EINTR || errno == EAGAIN ) )
        pthread_mutex_lock(&t->queue_cond_lock);
    } while (nb_events  < 0 && (errno == EINTR || errno == EAGAIN ) );
    AssertFatal (nb_events >=0,
Thomas Laurent's avatar
Thomas Laurent committed
                 "epoll_wait failed for task %s, nb fds %d, timeout %lu: %s!\n",
                 itti_get_task_name(task_id), t->nb_fd_epoll, 
                 t->next_timer != UINT64_MAX ? t->next_timer-current_time : -1, 
                 strerror(errno));
    LOG_D(ITTI,"receive on %d descriptors for %s\n", nb_events, itti_get_task_name(task_id));
    if (nb_events == 0)
Thomas Laurent's avatar
Thomas Laurent committed
      /* No data to read -> return */
      return 0;

    for (int i = 0; i < nb_events; i++) {
Thomas Laurent's avatar
Thomas Laurent committed
      /* Check if there is an event for ITTI for the event fd */
      if ((events[i].events & EPOLLIN) &&
          (events[i].data.fd == t->sem_fd)) {
Thomas Laurent's avatar
Thomas Laurent committed
        eventfd_t   sem_counter;
        /* Read will always return 1 */
        AssertFatal( sizeof(sem_counter) == read (t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
        /* Mark that the event has been processed */
        events[i].events &= ~EPOLLIN;
    return nb_events;
  int itti_get_events(task_id_t task_id, struct epoll_event *events, int nb_evts) {
    pthread_mutex_lock(&tasks[task_id]->queue_cond_lock);
    return itti_get_events_locked(task_id, events, nb_evts);
Thomas Laurent's avatar
Thomas Laurent committed
  }

  void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
    // Reception of one message, blocking caller
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    pthread_mutex_lock(&t->queue_cond_lock);

    struct epoll_event events[t->nb_fd_epoll];
Thomas Laurent's avatar
Thomas Laurent committed
    // Weird condition to deal with crap legacy itti interface
    if (t->message_queue.empty()) {
      do {
        itti_get_events_locked(task_id, events, t->nb_fd_epoll);
Thomas Laurent's avatar
Thomas Laurent committed
        pthread_mutex_lock(&t->queue_cond_lock);
      }
      while (t->message_queue.empty() && t->nb_fd_epoll == 1);
Thomas Laurent's avatar
Thomas Laurent committed
    }

    // Legacy design: we return even if we have no message
    // in this case, *received_msg is NULL
    if (t->message_queue.empty()) {
      *received_msg=NULL;
      LOG_D(ITTI,"task %s received even from other fd (total fds: %d), returning msg NULL\n",t->admin.name, t->nb_fd_epoll);
Thomas Laurent's avatar
Thomas Laurent committed
    } else {
      *received_msg=t->message_queue.back();
      t->message_queue.pop_back();
      LOG_D(ITTI,"task %s received a message\n",t->admin.name);
Thomas Laurent's avatar
Thomas Laurent committed
    }

    pthread_mutex_unlock (&t->queue_cond_lock);
  }

  void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
Thomas Laurent's avatar
Thomas Laurent committed
    //reception of one message, non-blocking
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed
    pthread_mutex_lock(&t->queue_cond_lock);

    if (!t->message_queue.empty()) {
      LOG_D(ITTI,"task %s received a message in polling mode\n",t->admin.name);
Thomas Laurent's avatar
Thomas Laurent committed
      *received_msg=t->message_queue.back();
      t->message_queue.pop_back();
    } else
      *received_msg=NULL;

    pthread_mutex_unlock (&t->queue_cond_lock);
  }

  int itti_create_task(const task_id_t task_id, void *(*start_routine)(void *), const ittiTask_parms_t *parms)
  {
    task_list_t *t=tasks[task_id];
    if (get_softmodem_params()->no_itti && task_id < sizeofArray(tasks_info) && parms && parms->shortcut_func) {
      LOG_W(ITTI, "not starting the thread for %s, the msg processing will be done in place\n", tasks_info[task_id].name);
      t->task_parms.shortcut_func = parms->shortcut_func;
      return 0;
    }
    threadCreate(&t->thread,
                 start_routine,
                 parms ? parms->args_to_start_routine : NULL,
                 (char *)itti_get_task_name(task_id),
                 -1,
                 OAI_PRIORITY_RT);
    LOG_D(ITTI,"Created Posix thread %s\n",  itti_get_task_name(task_id) );
Thomas Laurent's avatar
Thomas Laurent committed
    return 0;
  }

  void itti_exit_task(void) {
    pthread_exit (NULL);
  }

  void itti_terminate_tasks(task_id_t task_id) {
Thomas Laurent's avatar
Thomas Laurent committed
    // Sends Terminate signals to all tasks.
    itti_send_terminate_message (task_id);
    usleep(100*1000); // Allow the tasks to receive the message before going returning to main thread
Laurent Thomas's avatar
Laurent Thomas committed
  int itti_create_queue(const task_info_t *taskInfo) {
    pthread_mutex_lock (&lock_nb_queues);
    int newQueue=nb_queues++;
    task_list_t **new_tasks = (task_list_t **)realloc(tasks, nb_queues * sizeof(*tasks));
    AssertFatal(new_tasks != NULL, "could not realloc() tasks list");
    tasks = new_tasks;
    tasks[newQueue]= new task_list_t;
    tasks[newQueue]->task_parms = {0};
    pthread_mutex_unlock (&lock_nb_queues);
    LOG_D(ITTI, "Starting itti queue: %s as task %d\n", taskInfo->name, newQueue);
    pthread_mutex_init(&tasks[newQueue]->queue_cond_lock, NULL);
Laurent Thomas's avatar
Laurent Thomas committed
    memcpy(&tasks[newQueue]->admin, taskInfo, sizeof(task_info_t));
    AssertFatal( ( tasks[newQueue]->epoll_fd = epoll_create1(0) ) >=0, "");
    AssertFatal( ( tasks[newQueue]->sem_fd = eventfd(0, EFD_SEMAPHORE) ) >=0, "");
    itti_subscribe_event_fd((task_id_t)newQueue, tasks[newQueue]->sem_fd);

Laurent Thomas's avatar
Laurent Thomas committed
                const task_info_t *tasks
               ) {
    pthread_mutex_init(&lock_nb_queues, NULL);
    nb_queues=0;
Thomas Laurent's avatar
Thomas Laurent committed

    for(int i=0; i<task_max; ++i) {
Laurent Thomas's avatar
Laurent Thomas committed
      itti_create_queue(&tasks[i]);
Thomas Laurent's avatar
Thomas Laurent committed
    }

    return 0;
  }

  int timer_setup(
    uint32_t      interval_sec,
    uint32_t      interval_us,
    task_id_t     task_id,
Thomas Laurent's avatar
Thomas Laurent committed
    timer_type_t  type,
    void         *timer_arg,
    task_list_t *t=tasks[task_id];
Thomas Laurent's avatar
Thomas Laurent committed

    do {
      // set the taskid in the timer id to keep compatible with the legacy API
      // timer_remove() takes only the timer id as parameter
      *timer_id=(random()%UINT16_MAX) << 16 | task_id ;
    } while ( t->timer_map.find(*timer_id) != t->timer_map.end());

    /* Allocate new timer list element */
    timer_elm_t timer;
    struct timespec tp;
    clock_gettime(CLOCK_MONOTONIC, &tp);

    if (interval_us%1000 != 0)
      LOG_W(ITTI, "Can't set timer precision below 1ms, rounding it\n");
Thomas Laurent's avatar
Thomas Laurent committed

    timer.duration  = interval_sec*1000+interval_us/1000;
    timer.timeout= ((uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000)+timer.duration);
    timer.instance  = instance;
    timer.type      = type;
    timer.timer_arg = timer_arg;
    pthread_mutex_lock (&t->queue_cond_lock);
    t->timer_map[*timer_id]= timer;

    if (timer.timeout < t->next_timer)
      t->next_timer=timer.timeout;

    eventfd_t sem_counter = 1;
    AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
    pthread_mutex_unlock (&t->queue_cond_lock);
    return 0;
  }

  int timer_remove(long timer_id) {
Thomas Laurent's avatar
Thomas Laurent committed
    task_id_t task_id=(task_id_t)(timer_id&0xffff);
    int ret;
    pthread_mutex_lock (&tasks[task_id]->queue_cond_lock);
    ret=tasks[task_id]->timer_map.erase(timer_id);
    pthread_mutex_unlock (&tasks[task_id]->queue_cond_lock);
Thomas Laurent's avatar
Thomas Laurent committed

    if (ret==1)
      return 0;
    else {
      LOG_W(ITTI, "tried to remove a non existing timer\n");
Thomas Laurent's avatar
Thomas Laurent committed
      return 1;
    }
  }

  const char *itti_get_message_name(MessagesIds message_id) {
    return messages_info[message_id].name;
  }

  const char *itti_get_task_name(task_id_t task_id) {
    return tasks[task_id]->admin.name;
Thomas Laurent's avatar
Thomas Laurent committed
  }

  // void for compatibility
  void itti_send_terminate_message(task_id_t task_id) {
  }

  static sem_t itti_sem_block;
  void itti_wait_tasks_unblock()
  {
    int rc = sem_post(&itti_sem_block);
    AssertFatal(rc == 0, "error in sem_post(): %d %s\n", errno, strerror(errno));
  }

  static void catch_sigterm(int) {
    static const char msg[] = "\n** Caught SIGTERM, shutting down\n";
    __attribute__((unused))
    int unused = write(STDOUT_FILENO, msg, sizeof(msg) - 1);
  void itti_wait_tasks_end(void (*handler)(int))
  {
    int rc = sem_init(&itti_sem_block, 0, 0);
    AssertFatal(rc == 0, "error in sem_init(): %d %s\n", errno, strerror(errno));
    if (handler == NULL) /* no handler given: install default */
      handler = catch_sigterm;
    signal(SIGTERM, handler);
    signal(SIGINT, handler);
    rc = sem_wait(&itti_sem_block);
    AssertFatal(rc == 0, "error in sem_wait(): %d %s\n", errno, strerror(errno));
Thomas Laurent's avatar
Thomas Laurent committed
  }

  void itti_update_lte_time(uint32_t frame, uint8_t slot) {}
  void itti_set_task_real_time(task_id_t task_id) {}
  void itti_mark_task_ready(task_id_t task_id) {
    // Function meaning is clear, but legacy implementation is wrong
    // keep it void is fine: today implementation accepts messages in the queue before task is ready
  }
  void itti_wait_ready(int wait_tasks) {
    // Stupid function, kept for compatibility (the parameter is meaningless!!!)
  }
  int signal_mask(void) {
    return 0;
  }

void log_scheduler(const char* label)
{
    int policy = sched_getscheduler(0);
    struct sched_param param;
    if (sched_getparam(0, &param) == -1)
    {
        LOG_E(HW, "sched_getparam: %s\n", strerror(errno));
        abort();
    }

    cpu_set_t cpu_set;
    if (sched_getaffinity(0, sizeof(cpu_set), &cpu_set) == -1)
    {
        LOG_E(HW, "sched_getaffinity: %s\n", strerror(errno));
        abort();
    }
    int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    if (num_cpus < 1)
    {
        LOG_E(HW, "sysconf(_SC_NPROCESSORS_ONLN): %s\n", strerror(errno));
        abort();
    }
    char buffer[num_cpus];
    for (int i = 0; i < num_cpus; i++)
    {
        buffer[i] = CPU_ISSET(i, &cpu_set) ? 'Y' : '-';
    }

    LOG_A(HW, "Scheduler policy=%d priority=%d affinity=[%d]%.*s label=%s\n",
          policy,
          param.sched_priority,
          num_cpus,
          num_cpus,
          buffer,
          label);
Thomas Laurent's avatar
Thomas Laurent committed
}
Melissa's avatar
Melissa committed
} // extern "C"