From 0f9fbef24dabbd877f3cac3f730935b0b4c04773 Mon Sep 17 00:00:00 2001
From: laurent <laurent.thomas@open-cells.com>
Date: Mon, 25 Mar 2019 10:09:04 +0100
Subject: [PATCH] add thread pool feature

---
 common/utils/threadPool/thread-pool.c  | 53 ++++++++++++++++----------
 common/utils/threadPool/thread-pool.h  | 20 +++++-----
 common/utils/threadPool/thread-pool.md |  6 ++-
 3 files changed, 46 insertions(+), 33 deletions(-)

diff --git a/common/utils/threadPool/thread-pool.c b/common/utils/threadPool/thread-pool.c
index 8a706d9246d..4cc56fc9b81 100644
--- a/common/utils/threadPool/thread-pool.c
+++ b/common/utils/threadPool/thread-pool.c
@@ -10,7 +10,9 @@
 #include <fcntl.h>
 #include <string.h>
 #include <unistd.h>
-#include <thread-pool.h>
+#include <ctype.h>
+#include <sys/sysinfo.h>
+#include <threadPool/thread-pool.h>
 
 void displayList(notifiedFIFO_t *nf) {
   int n=0;
@@ -46,12 +48,16 @@ static inline  notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf,
 void *one_thread(void *arg) {
   struct  one_thread *myThread=(struct  one_thread *) arg;
   struct  thread_pool *tp=myThread->pool;
+
   // configure the thread core assignment
   // TBD: reserve the core for us exclusively
-  cpu_set_t cpuset;
-  CPU_ZERO(&cpuset);
-  CPU_SET(myThread->coreID, &cpuset);
-  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
+  if ( myThread->coreID >= 0 &&  myThread->coreID < get_nprocs_conf()) {
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    CPU_SET(myThread->coreID, &cpuset);
+    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
+  }
+
   //Configure the thread scheduler policy for Linux
   struct sched_param sparam= {0};
   sparam.sched_priority = sched_get_priority_max(SCHED_RR);
@@ -112,22 +118,27 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
   curptr=strtok_r(params,",",&saveptr);
 
   while ( curptr!=NULL ) {
-    if (curptr[0] == 'u' || curptr[0] == 'U') {
-      pool->restrictRNTI=true;
-    } else if ( curptr[0]>='0' && curptr[0]<='9' ) {
-      struct one_thread *tmp=pool->allthreads;
-      pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
-      pool->allthreads->next=tmp;
-      printf("create a thread for core %d\n", atoi(curptr));
-      pool->allthreads->coreID=atoi(curptr);
-      pool->allthreads->id=pool->nbThreads;
-      pool->allthreads->pool=pool;
-      pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads);
-      pool->nbThreads++;
-    } else if (curptr[0] == 'n' || curptr[0] == 'N') {
-      pool->activated=false;
-    } else
-      printf("Error in options for thread pool: %s\n",curptr);
+    int c=toupper(curptr[0]);
+
+    switch (c) {
+      case 'U':
+        pool->restrictRNTI=true;
+        break;
+
+      case 'N':
+        pool->activated=false;
+        break;
+
+      default:
+        pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
+        pool->allthreads->next=pool->allthreads;
+        printf("create a thread for core %d\n", atoi(curptr));
+        pool->allthreads->coreID=atoi(curptr);
+        pool->allthreads->id=pool->nbThreads;
+        pool->allthreads->pool=pool;
+        pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads);
+        pool->nbThreads++;
+    }
 
     curptr=strtok_r(NULL,",",&saveptr);
   }
diff --git a/common/utils/threadPool/thread-pool.h b/common/utils/threadPool/thread-pool.h
index 5c36364fc96..6f34508c5a9 100644
--- a/common/utils/threadPool/thread-pool.h
+++ b/common/utils/threadPool/thread-pool.h
@@ -10,7 +10,7 @@
 #include <pthread.h>
 #include <sys/syscall.h>
 #include <assertions.h>
-#include <log.h>
+#include <LOG/log.h>
 
 #ifdef DEBUG
   #define THREADINIT   PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
@@ -52,8 +52,7 @@ static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
     notifiedFIFO_t *reponseFifo,
     void (*processingFunc)(void *)) {
   notifiedFIFO_elt_t *ret;
-  size_t sz=sizeof(notifiedFIFO_elt_t)+size;
-  AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc((sz/32+1)*32)), "");
+  AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), "");
   ret->next=NULL;
   ret->key=key;
   ret->reponseFifo=reponseFifo;
@@ -69,8 +68,6 @@ static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
 }
 
 static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
-  bool tmp=elt->malloced;
-
   if (elt->malloced) {
     elt->malloced=false;
     free(elt);
@@ -121,9 +118,10 @@ static inline  notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
 
 static inline  notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
   int tmp=mutextrylock(nf->lockF);
+
   if (tmp != 0 )
     return NULL;
-  
+
   notifiedFIFO_elt_t *ret=nf->outF;
 
   if (ret!=NULL)
@@ -194,21 +192,22 @@ static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_
     msg->returnTime=rdtsc();
 
   if (t->traceFd)
-    (void)write(t->traceFd, msg, sizeof(*msg));
+    if(write(t->traceFd, msg, sizeof(*msg)));
 
   return msg;
 }
 
 static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
-  notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);
+  notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo);
+
   if (msg == NULL)
     return NULL;
-  
+
   if (t->measurePerf)
     msg->returnTime=rdtsc();
 
   if (t->traceFd)
-    (void)write(t->traceFd, msg, sizeof(*msg));
+    if(write(t->traceFd, msg, sizeof(*msg)));
 
   return msg;
 }
@@ -240,5 +239,6 @@ static inline void abortTpool(tpool_t *t, uint64_t key) {
 
   mutexunlock(nf->lockF);
 }
+void initTpool(char *params,tpool_t *pool, bool performanceMeas);
 
 #endif
diff --git a/common/utils/threadPool/thread-pool.md b/common/utils/threadPool/thread-pool.md
index 92cc21b975d..2b6336fb609 100644
--- a/common/utils/threadPool/thread-pool.md
+++ b/common/utils/threadPool/thread-pool.md
@@ -11,7 +11,7 @@ All the thread pool functions are thread safe, nevertheless the working function
 
 ## license
 Author: Laurent Thomas, Open cells project 
-The owner share the code usage to Openairsoftware alliance as per OSA license terms
+The owner share this piece code to Openairsoftware alliance as per OSA license terms
 
 # jobs
 
@@ -41,6 +41,8 @@ abort_notifiedFIFO() allows the customer to delete all waiting jobs that match w
 The clients can create one or more thread pools with init_tpool()
 the params string structure: describes a list of cores, separated by "," that run a worker thread
 
+If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
+
 The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>"
 
 ## adding jobs
@@ -66,4 +68,4 @@ A performance measurement is integrated: the pool will automacillay fill timesta
 if you set the environement variable: thread-pool-measurements to a valid file name
 These measurements will be wrote to this Linux pipe.
 
-A tool to read the linux fifo and display it in ascii will be provided (TBD)
+A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests.
-- 
GitLab