Something went wrong on our end
-
Robert Schmidt authoredRobert Schmidt authored
intertask_interface.cpp 18.76 KiB
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Author and copyright: Laurent Thomas, open-cells.com
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#include <vector>
#include <map>
#include <sys/eventfd.h>
#include <semaphore.h>
extern "C" {
#include <intertask_interface.h>
#include <common/utils/system.h>
#include "executables/softmodem-common.h"
typedef struct timer_elm_s {
timer_type_t type; ///< Timer type
long instance;
long duration;
uint64_t timeout;
void *timer_arg; ///< Optional argument that will be passed when timer expires
} timer_elm_t;
typedef struct task_list_s {
task_info_t admin;
ittiTask_parms_t task_parms;
pthread_t thread;
pthread_mutex_t queue_cond_lock;
std::vector<MessageDef *> message_queue;
std::map<long,timer_elm_t> timer_map;
uint64_t next_timer=UINT64_MAX;
int nb_fd_epoll=0;
int epoll_fd=-1;
int sem_fd=-1;
size_t last_log_size = 0;
} task_list_t;
int timer_expired(int fd);
static task_list_t **tasks=NULL;
static int nb_queues=0;
static pthread_mutex_t lock_nb_queues;
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size) {
void *ptr = NULL;
AssertFatal ((ptr=calloc (size, 1)) != NULL, "Memory allocation of %zu bytes failed (%d -> %d)!\n",
size, origin_task_id, destination_task_id);
return ptr;
}
int itti_free(task_id_t task_id, void *ptr) {
AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
free (ptr);
return (EXIT_SUCCESS);
}
// in the two following functions, the +32 in malloc is there to deal with gcc memory alignment
// because a struct size can be larger than sum(sizeof(struct components))
// We should remove the itti principle of a huge union for all types of messages in paramter "msg_t ittiMsg"
// to use a more C classical pointer casting "void * ittiMsg", later casted in the right struct
// but we would have to change all legacy macros, as per this example
// #define S1AP_REGISTER_ENB_REQ(mSGpTR) (mSGpTR)->ittiMsg.s1ap_register_enb_req
// would become
// #define S1AP_REGISTER_ENB_REQ(mSGpTR) (s1ap_register_enb_req) mSGpTR)->ittiMsg
MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, instance_t originInstance, MessagesIds message_id, MessageHeaderSize size) {
MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) +32 + size);
temp->ittiMsgHeader.messageId = message_id;
temp->ittiMsgHeader.originTaskId = origin_task_id;
temp->ittiMsgHeader.ittiMsgSize = size;
return temp;
}
MessageDef *itti_alloc_new_message(task_id_t origin_task_id, instance_t originInstance, MessagesIds message_id) {
int size=sizeof(MessageHeader) + 32 + messages_info[message_id].size;
MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, size);
temp->ittiMsgHeader.messageId = message_id;
temp->ittiMsgHeader.originTaskId = origin_task_id;
temp->ittiMsgHeader.ittiMsgSize = size;
temp->ittiMsgHeader.destinationTaskId=TASK_UNKNOWN;
temp->ittiMsgHeader.originInstance=originInstance;
temp->ittiMsgHeader.destinationInstance=0;
temp->ittiMsgHeader.lte_time= {0};
return temp;
//return itti_alloc_new_message_sized(origin_task_id, message_id, messages_info[message_id].size);
}
static inline int itti_send_msg_to_task_locked(task_id_t destination_task_id, instance_t destinationInstance, MessageDef *message) {
task_list_t *t=tasks[destination_task_id];
message->ittiMsgHeader.destinationTaskId = destination_task_id;
message->ittiMsgHeader.destinationInstance = destinationInstance;
message->ittiMsgHeader.lte_time.frame = 0;
message->ittiMsgHeader.lte_time.slot = 0;
int message_id = message->ittiMsgHeader.messageId;
size_t s=t->message_queue.size();
// to reduce the number of logs, we give a message each increase of 25%
if ((s > t->last_log_size * 1.25) && (s > t->admin.queue_size / 10)) {
if (s > t->admin.queue_size) {
LOG_E(TMR, "Queue for %s task contains %ld messages\n", itti_get_task_name(destination_task_id), s);
} else {
LOG_I(ITTI,
"Queue for %s task size: %ld (last message %s)\n",
itti_get_task_name(destination_task_id),
s + 1,
ITTI_MSG_NAME(message));
}
t->last_log_size = s;
} else if (t->last_log_size && s < t->admin.queue_size / 10) {
// Inform when the queue decreases
LOG_I(ITTI, "Queue for %s task size is back under 10%% of max size\n", itti_get_task_name(destination_task_id));
t->last_log_size = 0;
}
t->message_queue.insert(t->message_queue.begin(), message);
eventfd_t sem_counter = 1;
AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
LOG_D(ITTI, "sent messages id=%s messages_info to %s\n", messages_info[message_id].name, t->admin.name);
return 0;
}
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t destinationInstance, MessageDef *message) {
task_list_t *t=tasks[destination_task_id];
pthread_mutex_lock (&t->queue_cond_lock);
int ret=itti_send_msg_to_task_locked(destination_task_id, destinationInstance, message);
while (t->message_queue.size() > 0 && t->task_parms.shortcut_func != NULL) {
if (t->message_queue.size()>1)
LOG_W(ITTI,"queue in no thread mode is %ld\n", t->message_queue.size());
pthread_mutex_unlock (&t->queue_cond_lock);
t->task_parms.shortcut_func(NULL);
pthread_mutex_lock (&t->queue_cond_lock);
}
pthread_mutex_unlock (&t->queue_cond_lock);
return ret;
}
void itti_subscribe_event_fd(task_id_t task_id, int fd) {
struct epoll_event event;
task_list_t *t=tasks[task_id];
t->nb_fd_epoll++;
event.events = EPOLLIN | EPOLLERR;
event.data.u64 = 0;
event.data.fd = fd;
AssertFatal(epoll_ctl(t->epoll_fd, EPOLL_CTL_ADD, fd, &event) == 0,
"epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
itti_get_task_name(task_id), fd, strerror(errno));
eventfd_t sem_counter = 1;
AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
}
void itti_unsubscribe_event_fd(task_id_t task_id, int fd) {
task_list_t *t=tasks[task_id];
AssertFatal (epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL) == 0,
"epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
itti_get_task_name(task_id), fd, strerror(errno));
t->nb_fd_epoll--;
}
static inline int itti_get_events_locked(task_id_t task_id, struct epoll_event *events, int max_events) {
task_list_t *t=tasks[task_id];
uint64_t current_time=0;
int nb_events;
do {
if ( t->next_timer != UINT64_MAX ) {
struct timespec tp;
clock_gettime(CLOCK_MONOTONIC, &tp);
current_time=(uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000);
if ( t->next_timer < current_time) {
t->next_timer=UINT64_MAX;
// Proceed expired timer
for ( auto it=t->timer_map.begin(), next_it = it; it != t->timer_map.end() ; it = next_it ) {
++next_it;
if ( it->second.timeout < current_time ) {
MessageDef *message = itti_alloc_new_message(TASK_TIMER, it->second.instance,TIMER_HAS_EXPIRED);
message->ittiMsg.timer_has_expired.timer_id=it->first;
message->ittiMsg.timer_has_expired.arg=it->second.timer_arg;
if (itti_send_msg_to_task_locked(task_id, it->second.instance, message) < 0) {
LOG_W(ITTI,"Failed to send msg TIMER_HAS_EXPIRED to task %u\n", task_id);
free(message);
t->timer_map.erase(it);
return -1;
}
if ( it->second.type==TIMER_PERIODIC ) {
it->second.timeout+=it->second.duration;
if (it->second.timeout < t->next_timer)
t->next_timer=it->second.timeout;
} else
t->timer_map.erase(it);
} else if (it->second.timeout < t->next_timer)
t->next_timer=it->second.timeout;
}
}
}
int epoll_timeout = -1;
if ( t->next_timer != UINT64_MAX )
epoll_timeout = t->next_timer-current_time;
pthread_mutex_unlock(&t->queue_cond_lock);
LOG_D(ITTI,"enter blocking wait for %s, timeout: %d ms\n", itti_get_task_name(task_id), epoll_timeout);
nb_events = epoll_wait(t->epoll_fd, events, max_events, epoll_timeout);
if ( nb_events < 0 && (errno == EINTR || errno == EAGAIN ) )
pthread_mutex_lock(&t->queue_cond_lock);
} while (nb_events < 0 && (errno == EINTR || errno == EAGAIN ) );
AssertFatal (nb_events >=0,
"epoll_wait failed for task %s, nb fds %d, timeout %lu: %s!\n",
itti_get_task_name(task_id), t->nb_fd_epoll,
t->next_timer != UINT64_MAX ? t->next_timer-current_time : -1,
strerror(errno));
LOG_D(ITTI,"receive on %d descriptors for %s\n", nb_events, itti_get_task_name(task_id));
if (nb_events == 0)
/* No data to read -> return */
return 0;
for (int i = 0; i < nb_events; i++) {
/* Check if there is an event for ITTI for the event fd */
if ((events[i].events & EPOLLIN) &&
(events[i].data.fd == t->sem_fd)) {
eventfd_t sem_counter;
/* Read will always return 1 */
AssertFatal( sizeof(sem_counter) == read (t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
/* Mark that the event has been processed */
events[i].events &= ~EPOLLIN;
}
}
return nb_events;
}
int itti_get_events(task_id_t task_id, struct epoll_event *events, int nb_evts) {
pthread_mutex_lock(&tasks[task_id]->queue_cond_lock);
return itti_get_events_locked(task_id, events, nb_evts);
}
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
// Reception of one message, blocking caller
task_list_t *t=tasks[task_id];
pthread_mutex_lock(&t->queue_cond_lock);
struct epoll_event events[t->nb_fd_epoll];
// Weird condition to deal with crap legacy itti interface
if (t->message_queue.empty()) {
do {
itti_get_events_locked(task_id, events, t->nb_fd_epoll);
pthread_mutex_lock(&t->queue_cond_lock);
}
while (t->message_queue.empty() && t->nb_fd_epoll == 1);
}
// Legacy design: we return even if we have no message
// in this case, *received_msg is NULL
if (t->message_queue.empty()) {
*received_msg=NULL;
LOG_D(ITTI,"task %s received even from other fd (total fds: %d), returning msg NULL\n",t->admin.name, t->nb_fd_epoll);
} else {
*received_msg=t->message_queue.back();
t->message_queue.pop_back();
LOG_D(ITTI,"task %s received a message\n",t->admin.name);
}
pthread_mutex_unlock (&t->queue_cond_lock);
}
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
//reception of one message, non-blocking
task_list_t *t=tasks[task_id];
pthread_mutex_lock(&t->queue_cond_lock);
if (!t->message_queue.empty()) {
LOG_D(ITTI,"task %s received a message in polling mode\n",t->admin.name);
*received_msg=t->message_queue.back();
t->message_queue.pop_back();
} else
*received_msg=NULL;
pthread_mutex_unlock (&t->queue_cond_lock);
}
int itti_create_task(const task_id_t task_id, void *(*start_routine)(void *), const ittiTask_parms_t *parms)
{
task_list_t *t=tasks[task_id];
if (get_softmodem_params()->no_itti && task_id < sizeofArray(tasks_info) && parms && parms->shortcut_func) {
LOG_W(ITTI, "not starting the thread for %s, the msg processing will be done in place\n", tasks_info[task_id].name);
t->task_parms.shortcut_func = parms->shortcut_func;
return 0;
}
threadCreate(&t->thread,
start_routine,
parms ? parms->args_to_start_routine : NULL,
(char *)itti_get_task_name(task_id),
-1,
OAI_PRIORITY_RT);
LOG_I(ITTI,"Created Posix thread %s\n", itti_get_task_name(task_id) );
return 0;
}
void itti_exit_task(void) {
pthread_exit (NULL);
}
void itti_terminate_tasks(task_id_t task_id) {
// Sends Terminate signals to all tasks.
itti_send_terminate_message (task_id);
usleep(100*1000); // Allow the tasks to receive the message before going returning to main thread
}
int itti_create_queue(const task_info_t *taskInfo) {
pthread_mutex_lock (&lock_nb_queues);
int newQueue=nb_queues++;
task_list_t **new_tasks = (task_list_t **)realloc(tasks, nb_queues * sizeof(*tasks));
AssertFatal(new_tasks != NULL, "could not realloc() tasks list");
tasks = new_tasks;
tasks[newQueue]= new task_list_t;
tasks[newQueue]->task_parms = {0};
pthread_mutex_unlock (&lock_nb_queues);
LOG_D(ITTI, "Starting itti queue: %s as task %d\n", taskInfo->name, newQueue);
pthread_mutex_init(&tasks[newQueue]->queue_cond_lock, NULL);
memcpy(&tasks[newQueue]->admin, taskInfo, sizeof(task_info_t));
AssertFatal( ( tasks[newQueue]->epoll_fd = epoll_create1(0) ) >=0, "");
AssertFatal( ( tasks[newQueue]->sem_fd = eventfd(0, EFD_SEMAPHORE) ) >=0, "");
itti_subscribe_event_fd((task_id_t)newQueue, tasks[newQueue]->sem_fd);
return newQueue;
}
int itti_init(task_id_t task_max,
const task_info_t *tasks
) {
pthread_mutex_init(&lock_nb_queues, NULL);
nb_queues=0;
for(int i=0; i<task_max; ++i) {
itti_create_queue(&tasks[i]);
}
return 0;
}
int timer_setup(
uint32_t interval_sec,
uint32_t interval_us,
task_id_t task_id,
instance_t instance,
timer_type_t type,
void *timer_arg,
long *timer_id) {
task_list_t *t=tasks[task_id];
do {
// set the taskid in the timer id to keep compatible with the legacy API
// timer_remove() takes only the timer id as parameter
*timer_id=(random()%UINT16_MAX) << 16 | task_id ;
} while ( t->timer_map.find(*timer_id) != t->timer_map.end());
/* Allocate new timer list element */
timer_elm_t timer;
struct timespec tp;
clock_gettime(CLOCK_MONOTONIC, &tp);
if (interval_us%1000 != 0)
LOG_W(ITTI, "Can't set timer precision below 1ms, rounding it\n");
timer.duration = interval_sec*1000+interval_us/1000;
timer.timeout= ((uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000)+timer.duration);
timer.instance = instance;
timer.type = type;
timer.timer_arg = timer_arg;
pthread_mutex_lock (&t->queue_cond_lock);
t->timer_map[*timer_id]= timer;
if (timer.timeout < t->next_timer)
t->next_timer=timer.timeout;
eventfd_t sem_counter = 1;
AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), "");
pthread_mutex_unlock (&t->queue_cond_lock);
return 0;
}
int timer_remove(long timer_id) {
task_id_t task_id=(task_id_t)(timer_id&0xffff);
int ret;
pthread_mutex_lock (&tasks[task_id]->queue_cond_lock);
ret=tasks[task_id]->timer_map.erase(timer_id);
pthread_mutex_unlock (&tasks[task_id]->queue_cond_lock);
if (ret==1)
return 0;
else {
LOG_W(ITTI, "tried to remove a non existing timer\n");
return 1;
}
}
const char *itti_get_message_name(MessagesIds message_id) {
return messages_info[message_id].name;
}
const char *itti_get_task_name(task_id_t task_id) {
return tasks[task_id]->admin.name;
}
// void for compatibility
void itti_send_terminate_message(task_id_t task_id) {
}
sem_t itti_sem_block;
void itti_wait_tasks_unblock()
{
int rc = sem_post(&itti_sem_block);
AssertFatal(rc == 0, "error in sem_post(): %d %s\n", errno, strerror(errno));
}
static void catch_sigterm(int) {
static const char msg[] = "\n** Caught SIGTERM, shutting down\n";
__attribute__((unused))
int unused = write(STDOUT_FILENO, msg, sizeof(msg) - 1);
itti_wait_tasks_unblock();
}
void itti_wait_tasks_end(void (*handler)(int))
{
int rc = sem_init(&itti_sem_block, 0, 0);
AssertFatal(rc == 0, "error in sem_init(): %d %s\n", errno, strerror(errno));
if (handler == NULL) /* no handler given: install default */
handler = catch_sigterm;
signal(SIGTERM, handler);
signal(SIGINT, handler);
rc = sem_wait(&itti_sem_block);
AssertFatal(rc == 0, "error in sem_wait(): %d %s\n", errno, strerror(errno));
}
void itti_update_lte_time(uint32_t frame, uint8_t slot) {}
void itti_set_task_real_time(task_id_t task_id) {}
void itti_mark_task_ready(task_id_t task_id) {
// Function meaning is clear, but legacy implementation is wrong
// keep it void is fine: today implementation accepts messages in the queue before task is ready
}
void itti_wait_ready(int wait_tasks) {
// Stupid function, kept for compatibility (the parameter is meaningless!!!)
}
int signal_mask(void) {
return 0;
}
void log_scheduler(const char* label)
{
int policy = sched_getscheduler(0);
struct sched_param param;
if (sched_getparam(0, ¶m) == -1)
{
LOG_E(HW, "sched_getparam: %s\n", strerror(errno));
abort();
}
cpu_set_t cpu_set;
if (sched_getaffinity(0, sizeof(cpu_set), &cpu_set) == -1)
{
LOG_E(HW, "sched_getaffinity: %s\n", strerror(errno));
abort();
}
int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
if (num_cpus < 1)
{
LOG_E(HW, "sysconf(_SC_NPROCESSORS_ONLN): %s\n", strerror(errno));
abort();
}
char buffer[num_cpus];
for (int i = 0; i < num_cpus; i++)
{
buffer[i] = CPU_ISSET(i, &cpu_set) ? 'Y' : '-';
}
LOG_A(HW, "Scheduler policy=%d priority=%d affinity=[%d]%.*s label=%s\n",
policy,
param.sched_priority,
num_cpus,
num_cpus,
buffer,
label);
}
} // extern "C"