intertask_interface.c 16.9 KB
Newer Older
1 2
/*******************************************************************************

3 4
 Eurecom OpenAirInterface
 Copyright(c) 1999 - 2012 Eurecom
5

6 7 8
 This program is free software; you can redistribute it and/or modify it
 under the terms and conditions of the GNU General Public License,
 version 2, as published by the Free Software Foundation.
9

10 11 12 13
 This program is distributed in the hope it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
 more details.
14

15 16 17
 You should have received a copy of the GNU General Public License along with
 this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
18

19 20
 The full GNU General Public License is included in this distribution in
 the file called "COPYING".
21

22 23 24 25 26 27
 Contact Information
 Openair Admin: openair_admin@eurecom.fr
 Openair Tech : openair_tech@eurecom.fr
 Forums       : http://forums.eurecom.fr/openairinterface
 Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
 06410 Biot FRANCE
28

29
 *******************************************************************************/
30 31 32 33 34 35 36

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
37
#include <signal.h>
38 39 40 41 42 43 44 45 46 47 48

#include "queue.h"
#include "assertions.h"

#include "intertask_interface.h"
#include "intertask_interface_dump.h"
/* Includes "intertask_interface_init.h" to check prototype coherence, but disable threads and messages information generation */
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

49
#include "signals.h"
50 51 52 53 54 55 56 57 58 59 60 61 62
#include "timer.h"

int itti_debug = 1;

#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \
    while(0)
#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \
    while(0)

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
63
    TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
64 65 66 67 68 69
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
struct message_list_s {
    STAILQ_ENTRY(message_list_s) next_element;

70
    MessageDef *msg; ///< Pointer to the message
71

72 73
    message_number_t message_number; ///< Unique message number
    uint32_t message_priority; ///< Message priority
74 75 76 77
};

typedef struct task_desc_s {
    /* Queue of messages belonging to the task */
78 79
    STAILQ_HEAD(message_queue_head, message_list_s)
    message_queue;
80 81

    /* Number of messages in the queue */
82
    volatile uint32_t message_in_queue;
83
    /* Mutex for the message queue */
84
    pthread_mutex_t message_queue_mutex;
85
    /* Conditional var for message queue and task synchro */
86 87
    pthread_cond_t message_queue_cond_var;
    pthread_t task_thread;
88 89 90 91 92 93
    volatile task_state_t task_state;
} task_desc_t;

struct itti_desc_s {
    task_desc_t *tasks;
    /* Current message number. Incremented every call to send_msg_to_task */
94
    message_number_t message_number __attribute__((aligned(8)));
95 96 97 98

    thread_id_t thread_max;
    MessagesIds messages_id_max;

99 100
    pthread_t thread_handling_signals;

101 102 103 104 105 106
    const char * const *threads_name;
    const message_info_t *messages_info;
};

static struct itti_desc_s itti_desc;

107
static inline message_number_t itti_increment_message_number(void) {
108 109 110 111
    /* Atomic operation supported by GCC: returns the current message number
     * and then increment it by 1.
     * This can be done without mutex.
     */
112
    return __sync_fetch_and_add (&itti_desc.message_number, 1);
113 114
}

115
static inline uint32_t itti_get_message_priority(MessagesIds message_id) {
116 117 118 119 120
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].priority);
}

121
char *itti_get_message_name(MessagesIds message_id) {
122 123 124 125 126
    DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0);

    return (itti_desc.messages_info[message_id].name);
}

127 128
int itti_send_broadcast_message(MessageDef *message_p) {
    thread_id_t origin_thread_id;
129 130
    uint32_t i;
    int ret = 0;
131
    int result;
132

133 134 135 136
    if (message_p == NULL) {
        ITTI_ERROR("Message to broadcast is NULL (%s:%d)\n", __FILE__, __LINE__);
        return -1;
    }
137

138
    origin_thread_id = TASK_GET_THREAD_ID(message_p->header.originTaskId);
139

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
    for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
        MessageDef *new_message_p;

        /* Skip task that broadcast the message */
        if (i != origin_thread_id) {
            /* Skip tasks which are not running */
            if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
                new_message_p = malloc (sizeof(MessageDef));

                if (new_message_p == NULL) {
                    ITTI_ERROR("Failed to allocate memory (%s:%d)\n", __FILE__, __LINE__);
                    return -1;
                }
                memcpy (new_message_p, message_p, sizeof(MessageDef));
                result = itti_send_msg_to_task (TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p);
                if (result < 0) {
                    ITTI_ERROR("Failed to send broadcast message (%s) to queue (%u:%s)\n",
                               itti_desc.messages_info[message_p->header.messageId].name, i, itti_desc.threads_name[i]);
                    ret = result;
                    free (new_message_p);
                }
161
            }
162 163
        }
    }
164
    free (message_p);
165 166 167 168

    return ret;
}

169
inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id) {
170 171
    MessageDef *temp = NULL;

172 173
    if (message_id >= itti_desc.messages_id_max) {
        ITTI_ERROR("Invalid message id %d (%s:%d)\n", message_id, __FILE__, __LINE__);
174 175 176
        return NULL;
    }

177
    temp = calloc (1, MESSAGE_SIZE(message_id));
178 179

    if (temp == NULL) {
180
        ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__);
181 182 183 184 185 186 187 188 189 190
        return NULL;
    }

    temp->header.messageId = message_id;
    temp->header.originTaskId = origin_task_id;
    temp->header.size = itti_desc.messages_info[message_id].size;

    return temp;
}

191 192 193 194 195 196
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message) {
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
    struct message_list_s *new;
    uint32_t priority;
    message_number_t message_number;
    uint32_t message_id;
197

198
    if (thread_id >= itti_desc.thread_max) {
199 200 201 202 203 204 205 206 207 208 209
        return -1;
    }

    message->header.destinationTaskId = task_id;
    message->header.instance = instance;
    message_id = message->header.messageId;

    DevAssert(message != NULL);
    DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
    DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0);

210
    priority = itti_get_message_priority (message_id);
211 212

    /* Lock the mutex to get exclusive access to the list */
213
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
214 215

    /* We cannot send a message if the task is not running */
216 217
    DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY, itti_desc.tasks[thread_id].task_state,
             TASK_STATE_READY, thread_id);
218 219

    /* Check the number of messages in the queue */
220 221
    DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK,
             (itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK,
222 223 224
             itti_desc.tasks[thread_id].message_in_queue);

    /* Allocate new list element */
225 226
    if ((new = (struct message_list_s *) malloc (sizeof(struct message_list_s))) == NULL) {
        ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__);
227 228 229 230
        return -1;
    }

    /* Increment the global message number */
231
    message_number = itti_increment_message_number ();
232 233

    /* Fill in members */
234 235
    new->msg = message;
    new->message_number = message_number;
236 237
    new->message_priority = priority;

238 239
    itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
                             MESSAGE_SIZE(message_id));
240

241 242 243 244
    if (STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
        STAILQ_INSERT_HEAD (&itti_desc.tasks[thread_id].message_queue, new, next_element);
    }
    else {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
//         struct message_list_s *insert_after = NULL;
//         struct message_list_s *temp;
// 
//         /* This method is inefficient... */
//         STAILQ_FOREACH(temp, &itti_desc.tasks[thread_id].message_queue, next_element) {
//             struct message_list_s *next;
//             next = STAILQ_NEXT(temp, next_element);
//             /* Increment message priority to create a sort of
//              * priority based scheduler */
// //             if (temp->message_priority < TASK_PRIORITY_MAX) {
// //                 temp->message_priority++;
// //             }
//             if (next && next->message_priority < priority) {
//                 insert_after = temp;
//                 break;
//             }
//         }
//         if (insert_after == NULL) {
263
        STAILQ_INSERT_TAIL (&itti_desc.tasks[thread_id].message_queue, new, next_element);
264 265 266 267 268 269 270 271 272 273
//         } else {
//             STAILQ_INSERT_AFTER(&itti_desc.tasks[thread_id].message_queue, insert_after, new,
//                                 next_element);
//         }
    }

    /* Update the number of messages in the queue */
    itti_desc.tasks[thread_id].message_in_queue++;
    if (itti_desc.tasks[thread_id].message_in_queue == 1) {
        /* Emit a signal to wake up target task thread */
274
        pthread_cond_signal (&itti_desc.tasks[thread_id].message_queue_cond_var);
275 276
    }
    /* Release the mutex */
277 278 279 280
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
    ITTI_DEBUG(
            "Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
            itti_desc.messages_info[message_id].name, message_number, priority, thread_id, itti_desc.threads_name[thread_id]);
281 282 283
    return 0;
}

284
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
285 286 287 288 289 290
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
    DevAssert(received_msg != NULL);

    // Lock the mutex to get exclusive access to the list
291
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
292 293

    if (itti_desc.tasks[thread_id].message_in_queue == 0) {
294
        ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", thread_id, itti_desc.threads_name[thread_id]);
295
        // Wait while list == 0
296 297
        pthread_cond_wait (&itti_desc.tasks[thread_id].message_queue_cond_var,
                           &itti_desc.tasks[thread_id].message_queue_mutex);
298 299 300 301
        ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification for task %x\n",
                   thread_id, itti_desc.threads_name[thread_id], task_id);
    }

302 303
    if (!STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
        struct message_list_s *temp = STAILQ_FIRST (&itti_desc.tasks[thread_id].message_queue);
304 305 306 307 308

        /* Update received_msg reference */
        *received_msg = temp->msg;

        /* Remove message from queue */
309 310
        STAILQ_REMOVE_HEAD (&itti_desc.tasks[thread_id].message_queue, next_element);
        free (temp);
311 312 313
        itti_desc.tasks[thread_id].message_in_queue--;
    }
    // Release the mutex
314
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
315 316
}

317
void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) {
318 319
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

320 321
    DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
    DevAssert(received_msg != NULL);
322 323 324

    *received_msg = NULL;

325
    if (itti_desc.tasks[thread_id].message_in_queue != 0) {
326 327 328 329 330
        struct message_list_s *temp;

        // Lock the mutex to get exclusive access to the list
        pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);

331
        STAILQ_FOREACH (temp, &itti_desc.tasks[thread_id].message_queue, next_element)
332 333
        {
            if ((temp->msg->header.destinationTaskId == task_id)
334
                    && ((instance == INSTANCE_ALL) || (temp->msg->header.instance == instance))) {
335 336 337 338
                /* Update received_msg reference */
                *received_msg = temp->msg;

                /* Remove message from queue */
339
                STAILQ_REMOVE (&itti_desc.tasks[thread_id].message_queue, temp, message_list_s, next_element);
340 341 342
                free (temp);
                itti_desc.tasks[thread_id].message_in_queue--;

343 344 345
                ITTI_DEBUG(
                        "Receiver queue[(%u:%s)] got new message %s, number %lu for task %x\n",
                        thread_id, itti_desc.threads_name[thread_id], itti_desc.messages_info[temp->msg->header.messageId].name, temp->message_number, task_id);
346 347 348 349 350 351 352 353
                break;
            }
        }

        // Release the mutex
        pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
    }

354
    if (*received_msg == NULL) {
355 356 357 358
        ITTI_DEBUG("No message in queue[(%u:%s)] for task %x\n", thread_id, itti_desc.threads_name[thread_id], task_id);
    }
}

359
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) {
360 361 362 363 364 365 366
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevAssert(start_routine != NULL);
    DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);

    if (itti_desc.tasks[thread_id].task_state != TASK_STATE_NOT_CONFIGURED) {
        ITTI_ERROR("You are attempting to start an already configured thread"
367 368
        " for %s thread\n",
                   itti_desc.threads_name[thread_id]);
369 370 371 372 373
        return -1;
    }

    itti_desc.tasks[thread_id].task_state = TASK_STATE_STARTING;

374 375 376
    if (pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p) < 0) {
        ITTI_ERROR("Failed to initialize %s thread: %s:%d\n",
                   itti_desc.threads_name[thread_id], strerror(errno), errno);
377 378 379 380
        return -1;
    }

    /* Wait till the thread is completely ready */
381 382
    while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY)
        ;
383 384 385
    return 0;
}

386
void itti_mark_task_ready(task_id_t task_id) {
387 388 389 390
    thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

    DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0);
    // Lock the mutex to get exclusive access to the list
391
    pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
392 393
    itti_desc.tasks[thread_id].task_state = TASK_STATE_READY;
    // Release the mutex
394 395 396 397 398 399 400 401 402 403 404 405
    pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}

void itti_terminate_tasks(task_id_t task_id) {
    // Sends Terminate signals to all tasks.
    itti_send_terminate_message (task_id);

    if (itti_desc.thread_handling_signals >= 0) {
        pthread_kill (itti_desc.thread_handling_signals, SIGUSR1);
    }

    pthread_exit (NULL);
406 407
}

408 409
int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * const *threads_name,
              const message_info_t *messages_info, const char * const messages_definition_xml) {
410 411 412
    int i;
    itti_desc.message_number = 0;

413 414
    CHECK_INIT_RETURN(signal_init());

415 416 417
    /* Saves threads and messages max values */
    itti_desc.thread_max = thread_max;
    itti_desc.messages_id_max = messages_id_max;
418
    itti_desc.thread_handling_signals = -1;
419 420 421 422
    itti_desc.threads_name = threads_name;
    itti_desc.messages_info = messages_info;

    /* Allocates memory for tasks info */
423
    itti_desc.tasks = calloc (itti_desc.thread_max, sizeof(task_desc_t));
424 425 426

    /* Initializing each queue and related stuff */
    for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
427
        STAILQ_INIT (&itti_desc.tasks[i].message_queue);
428 429
        itti_desc.tasks[i].message_in_queue = 0;
        // Initialize mutexes
430
        pthread_mutex_init (&itti_desc.tasks[i].message_queue_mutex, NULL);
431
        // Initialize Cond vars
432
        pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL);
433 434
        itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED;
    }
435
    itti_dump_init (messages_definition_xml);
436

437
    CHECK_INIT_RETURN(timer_init ());
438 439 440 441

    return 0;
}

442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
void itti_wait_tasks_end(void) {
    int end = 0;
    int i;

    itti_desc.thread_handling_signals = pthread_self ();

    /* Handle signals here */
    while (end == 0) {
        signal_handle (&end);
    }

    for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
        /* Skip tasks which are not running */
        if (itti_desc.tasks[i].task_state == TASK_STATE_READY) {
            ITTI_DEBUG("Waiting end of thread %s\n", itti_desc.threads_name[i]);

            pthread_join (itti_desc.tasks[i].task_thread, NULL);
            itti_desc.tasks[i].task_state = TASK_STATE_ENDED;
        }
    }
}

void itti_send_terminate_message(task_id_t task_id) {
465 466
    MessageDef *terminate_message_p;

467
    terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
468

469
    itti_send_broadcast_message (terminate_message_p);
470
}