Something went wrong on our end
-
Bartosz Podrygajlo authored
- removed some whitespace noise. - cleaned up CMakeLists.txt - reintegrated task_manager.c into thread-pool.c - cleaned up some unnecessary code Co-authored-by:
Cedric Roux <cedric.roux@eurecom.fr> Co-authored-by:
Mikel Irazabal <mikel.irazabal@openairinterface.com>
Bartosz Podrygajlo authored- removed some whitespace noise. - cleaned up CMakeLists.txt - reintegrated task_manager.c into thread-pool.c - cleaned up some unnecessary code Co-authored-by:
Cedric Roux <cedric.roux@eurecom.fr> Co-authored-by:
Mikel Irazabal <mikel.irazabal@openairinterface.com>
thread-pool.c 5.02 KiB
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#define _GNU_SOURCE
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <ctype.h>
#include "thread-pool.h"
#include "bounded_notified_fifo.h"
#include <sys/sysinfo.h>
typedef struct {
tpool_t* tpool;
int idx;
} task_thread_args_t;
void pushTpool(tpool_t* tpool, task_t task)
{
DevAssert(tpool != NULL);
if (tpool->len_thr == 0) {
task.func(task.args);
return;
}
size_t const index = tpool->index++;
size_t const len_thr = tpool->len_thr;
not_q_t* q_arr = (not_q_t*)tpool->q_arr;
for (size_t i = 0; i < len_thr; ++i) {
if (try_push_not_q(&q_arr[(i + index) % len_thr], task)) {
return;
}
}
push_not_q(&q_arr[index % len_thr], task);
}
static void* worker_thread(void* arg)
{
DevAssert(arg != NULL);
task_thread_args_t* args = (task_thread_args_t*)arg;
int const idx = args->idx;
tpool_t* tpool = args->tpool;
uint32_t const len = tpool->len_thr;
uint32_t const num_it = 2 * (tpool->len_thr + idx);
not_q_t* q_arr = (not_q_t*)tpool->q_arr;
init_not_q(&q_arr[idx], idx);
// Synchronize all threads
pthread_barrier_wait(&tpool->barrier);
for (;;) {
ret_try_t ret = {.success = false};
for (uint32_t i = idx; i < num_it; ++i) {
ret = try_pop_not_q(&q_arr[i % len]);
if (ret.success == true)
break;
}
if (ret.success == false) {
if (pop_not_q(&q_arr[idx], &ret) == false)
break;
}
if (ret.t.func == NULL && ret.t.args == NULL) {
pushTpool(tpool, (task_t){.args = NULL, .func = NULL});
break;
}
ret.t.func(ret.t.args);
}
free(args);
return NULL;
}
void initNamedTpool(char* params, tpool_t* tpool, bool performanceMeas, char* name)
{
(void)performanceMeas;
DevAssert(tpool != NULL);
memset(tpool, 0, sizeof(*tpool));
char* tname = (name == NULL ? "Tpool" : name);
char *saveptr, *curptr;
char* parms_cpy = strdup(params);
curptr = strtok_r(parms_cpy, ",", &saveptr);
int core_id[128] = {0};
int num_workers = 0;
while (curptr != NULL) {
int c = toupper(curptr[0]);
switch (c) {
case 'N':
break;
default:
core_id[num_workers++] = atoi(curptr);
}
curptr = strtok_r(NULL, ",", &saveptr);
}
free(parms_cpy);
if (num_workers) {
tpool->q_arr = calloc(num_workers, sizeof(not_q_t));
AssertFatal(tpool->q_arr != NULL, "Memory exhausted");
tpool->t_arr = calloc(num_workers, sizeof(pthread_t));
AssertFatal(tpool->t_arr != NULL, "Memory exhausted");
}
tpool->len_thr = num_workers;
tpool->index = 0;
const pthread_barrierattr_t* barrier_attr = NULL;
int rc = pthread_barrier_init(&tpool->barrier, barrier_attr, num_workers + 1);
DevAssert(rc == 0);
for (size_t i = 0; i < num_workers; ++i) {
task_thread_args_t* args = malloc(sizeof(task_thread_args_t));
AssertFatal(args != NULL, "Memory exhausted");
args->idx = i;
args->tpool = tpool;
char name[64];
sprintf(name, "%s%ld_%d", tname, i, core_id[i]);
threadCreate(&tpool->t_arr[i], worker_thread, args, name, core_id[i], OAI_PRIORITY_RT_MAX);
}
// Syncronize thread pool threads. All the threads started
pthread_barrier_wait(&tpool->barrier);
}
void initFloatingCoresTpool(int nbThreads, tpool_t* pool, bool performanceMeas, char* name)
{
char threads[1024] = "n";
if (nbThreads) {
strcpy(threads, "-1");
for (int i = 1; i < nbThreads; i++)
strncat(threads, ",-1", sizeof(threads) - 1);
}
threads[sizeof(threads) - 1] = 0;
initNamedTpool(threads, pool, performanceMeas, name);
}
void abortTpool(tpool_t* tpool)
{
if (tpool->len_thr > 0) {
not_q_t* q_arr = (not_q_t*)tpool->q_arr;
pushTpool(tpool, (task_t){.args = NULL, .func = NULL});
for (uint32_t i = 0; i < tpool->len_thr; ++i) {
int rc = pthread_join(tpool->t_arr[i], NULL);
DevAssert(rc == 0);
}
for (uint32_t i = 0; i < tpool->len_thr; ++i) {
free_not_q(&q_arr[i]);
}
free(tpool->q_arr);
free(tpool->t_arr);
}
int rc = pthread_barrier_destroy(&tpool->barrier);
DevAssert(rc == 0);
}