diff --git a/common/utils/threadPool/thread-pool.c b/common/utils/threadPool/thread-pool.c index 8a706d9246d0836fcdab4aa665a9ac75882151dd..4cc56fc9b8126f8a5f88d03ce8be780563b7d116 100644 --- a/common/utils/threadPool/thread-pool.c +++ b/common/utils/threadPool/thread-pool.c @@ -10,7 +10,9 @@ #include <fcntl.h> #include <string.h> #include <unistd.h> -#include <thread-pool.h> +#include <ctype.h> +#include <sys/sysinfo.h> +#include <threadPool/thread-pool.h> void displayList(notifiedFIFO_t *nf) { int n=0; @@ -46,12 +48,16 @@ static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, void *one_thread(void *arg) { struct one_thread *myThread=(struct one_thread *) arg; struct thread_pool *tp=myThread->pool; + // configure the thread core assignment // TBD: reserve the core for us exclusively - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(myThread->coreID, &cpuset); - pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if ( myThread->coreID >= 0 && myThread->coreID < get_nprocs_conf()) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(myThread->coreID, &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + } + //Configure the thread scheduler policy for Linux struct sched_param sparam= {0}; sparam.sched_priority = sched_get_priority_max(SCHED_RR); @@ -112,22 +118,27 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) { curptr=strtok_r(params,",",&saveptr); while ( curptr!=NULL ) { - if (curptr[0] == 'u' || curptr[0] == 'U') { - pool->restrictRNTI=true; - } else if ( curptr[0]>='0' && curptr[0]<='9' ) { - struct one_thread *tmp=pool->allthreads; - pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread)); - pool->allthreads->next=tmp; - printf("create a thread for core %d\n", atoi(curptr)); - pool->allthreads->coreID=atoi(curptr); - pool->allthreads->id=pool->nbThreads; - pool->allthreads->pool=pool; - pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads); - pool->nbThreads++; - } else if (curptr[0] == 'n' || curptr[0] == 'N') { - pool->activated=false; - } else - printf("Error in options for thread pool: %s\n",curptr); + int c=toupper(curptr[0]); + + switch (c) { + case 'U': + pool->restrictRNTI=true; + break; + + case 'N': + pool->activated=false; + break; + + default: + pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread)); + pool->allthreads->next=pool->allthreads; + printf("create a thread for core %d\n", atoi(curptr)); + pool->allthreads->coreID=atoi(curptr); + pool->allthreads->id=pool->nbThreads; + pool->allthreads->pool=pool; + pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads); + pool->nbThreads++; + } curptr=strtok_r(NULL,",",&saveptr); } diff --git a/common/utils/threadPool/thread-pool.h b/common/utils/threadPool/thread-pool.h index 5c36364fc96dd779757d5d8c0a169f6ead4304e2..6f34508c5a9023b23d3b15e4be9b036c09b2b06a 100644 --- a/common/utils/threadPool/thread-pool.h +++ b/common/utils/threadPool/thread-pool.h @@ -10,7 +10,7 @@ #include <pthread.h> #include <sys/syscall.h> #include <assertions.h> -#include <log.h> +#include <LOG/log.h> #ifdef DEBUG #define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP @@ -52,8 +52,7 @@ static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size, notifiedFIFO_t *reponseFifo, void (*processingFunc)(void *)) { notifiedFIFO_elt_t *ret; - size_t sz=sizeof(notifiedFIFO_elt_t)+size; - AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc((sz/32+1)*32)), ""); + AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), ""); ret->next=NULL; ret->key=key; ret->reponseFifo=reponseFifo; @@ -69,8 +68,6 @@ static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) { } static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) { - bool tmp=elt->malloced; - if (elt->malloced) { elt->malloced=false; free(elt); @@ -121,9 +118,10 @@ static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) { static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) { int tmp=mutextrylock(nf->lockF); + if (tmp != 0 ) return NULL; - + notifiedFIFO_elt_t *ret=nf->outF; if (ret!=NULL) @@ -194,21 +192,22 @@ static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_ msg->returnTime=rdtsc(); if (t->traceFd) - (void)write(t->traceFd, msg, sizeof(*msg)); + if(write(t->traceFd, msg, sizeof(*msg))); return msg; } static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { - notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo); + notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo); + if (msg == NULL) return NULL; - + if (t->measurePerf) msg->returnTime=rdtsc(); if (t->traceFd) - (void)write(t->traceFd, msg, sizeof(*msg)); + if(write(t->traceFd, msg, sizeof(*msg))); return msg; } @@ -240,5 +239,6 @@ static inline void abortTpool(tpool_t *t, uint64_t key) { mutexunlock(nf->lockF); } +void initTpool(char *params,tpool_t *pool, bool performanceMeas); #endif diff --git a/common/utils/threadPool/thread-pool.md b/common/utils/threadPool/thread-pool.md index 92cc21b975d72473196d491835336b40adea1552..2b6336fb6093e7789d82aeb5ccef6cd5815b5f27 100644 --- a/common/utils/threadPool/thread-pool.md +++ b/common/utils/threadPool/thread-pool.md @@ -11,7 +11,7 @@ All the thread pool functions are thread safe, nevertheless the working function ## license Author: Laurent Thomas, Open cells project -The owner share the code usage to Openairsoftware alliance as per OSA license terms +The owner share this piece code to Openairsoftware alliance as per OSA license terms # jobs @@ -41,6 +41,8 @@ abort_notifiedFIFO() allows the customer to delete all waiting jobs that match w The clients can create one or more thread pools with init_tpool() the params string structure: describes a list of cores, separated by "," that run a worker thread +If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core). + The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>" ## adding jobs @@ -66,4 +68,4 @@ A performance measurement is integrated: the pool will automacillay fill timesta if you set the environement variable: thread-pool-measurements to a valid file name These measurements will be wrote to this Linux pipe. -A tool to read the linux fifo and display it in ascii will be provided (TBD) +A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests.