diff --git a/common/utils/threadPool/thread-pool.c b/common/utils/threadPool/thread-pool.c index 5398fe2e7e0f9fb48b7c57b565ff47f3e4239d59..aa80fae829cded7a9522a70af50d8c173b78f3b0 100644 --- a/common/utils/threadPool/thread-pool.c +++ b/common/utils/threadPool/thread-pool.c @@ -48,9 +48,14 @@ void displayList(notifiedFIFO_t *nf) { static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, struct one_thread *thr) { mutexlock(nf->lockF); - while(!nf->outF) + while(!nf->outF && !thr->terminate) condwait(nf->notifF, nf->lockF); + if (thr->terminate) { + mutexunlock(nf->lockF); + return NULL; + } + notifiedFIFO_elt_t *ret=nf->outF; nf->outF=nf->outF->next; @@ -59,7 +64,7 @@ static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, // For abort feature thr->runningOnKey=ret->key; - thr->abortFlag=false; + thr->dropJob = false; mutexunlock(nf->lockF); return ret; } @@ -71,6 +76,10 @@ void *one_thread(void *arg) { // Infinite loop to process requests do { notifiedFIFO_elt_t *elt=pullNotifiedFifoRemember(&tp->incomingFifo, myThread); + if (elt == NULL) { + AssertFatal(myThread->terminate, "pullNotifiedFifoRemember() returned NULL although thread not aborted\n"); + break; + } if (tp->measurePerf) elt->startProcessingTime=rdtsc_oai(); @@ -82,14 +91,15 @@ void *one_thread(void *arg) { // Check if the job is still alive, else it has been aborted mutexlock(tp->incomingFifo.lockF); - if (myThread->abortFlag) + if (myThread->dropJob) delNotifiedFIFO_elt(elt); else pushNotifiedFIFO(elt->reponseFifo, elt); myThread->runningOnKey=-1; mutexunlock(tp->incomingFifo.lockF); } - } while (true); + } while (!myThread->terminate); + return NULL; } void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name) { @@ -137,6 +147,8 @@ void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name pool->allthreads->coreID=atoi(curptr); pool->allthreads->id=pool->nbThreads; pool->allthreads->pool=pool; + pool->allthreads->dropJob = false; + pool->allthreads->terminate = false; //Configure the thread scheduler policy for Linux // set the thread name for debugging sprintf(pool->allthreads->name,"%s%d_%d",tname,pool->nbThreads,pool->allthreads->coreID); diff --git a/common/utils/threadPool/thread-pool.h b/common/utils/threadPool/thread-pool.h index 17bc87f701b1e0de574943283d9bdd65ca066edb..c07a6a073fed233f856a7d51af6099d7a363565e 100644 --- a/common/utils/threadPool/thread-pool.h +++ b/common/utils/threadPool/thread-pool.h @@ -72,6 +72,7 @@ typedef struct notifiedFIFO_s { notifiedFIFO_elt_t *inF; pthread_mutex_t lockF; pthread_cond_t notifF; + bool abortFIFO; // if set, the FIFO always returns NULL -> abort condition } notifiedFIFO_t; // You can use this allocator or use any piece of memory @@ -107,6 +108,7 @@ static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) { static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) { nf->inF=NULL; nf->outF=NULL; + nf->abortFIFO = false; } static inline void initNotifiedFIFO(notifiedFIFO_t *nf) { mutexinit(nf->lockF); @@ -152,9 +154,9 @@ static inline notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) { mutexlock(nf->lockF); - notifiedFIFO_elt_t *ret; + notifiedFIFO_elt_t *ret = NULL; - while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL) + while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL && !nf->abortFIFO) condwait(nf->notifF, nf->lockF); mutexunlock(nf->lockF); @@ -167,6 +169,11 @@ static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) { if (tmp != 0 ) return NULL; + if (nf->abortFIFO) { + mutexunlock(nf->lockF); + return NULL; + } + notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf); mutexunlock(nf->lockF); return ret; @@ -217,13 +224,14 @@ struct one_thread { int coreID; char name[256]; uint64_t runningOnKey; - bool abortFlag; + bool dropJob; + bool terminate; struct thread_pool *pool; struct one_thread *next; }; typedef struct thread_pool { - int activated; + bool activated; bool measurePerf; int traceFd; int dummyTraceFd; @@ -256,6 +264,8 @@ static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) { static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo); + if (msg == NULL) + return NULL; AssertFatal(t->traceFd, "Thread pool used while not initialized"); if (t->measurePerf) msg->returnTime=rdtsc_oai(); @@ -284,6 +294,7 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo static inline int abortTpoolJob(tpool_t *t, uint64_t key) { int nbRemoved=0; notifiedFIFO_t *nf=&t->incomingFifo; + mutexlock(nf->lockF); notifiedFIFO_elt_t **start=&nf->outF; @@ -300,20 +311,62 @@ static inline int abortTpoolJob(tpool_t *t, uint64_t key) { if (t->incomingFifo.outF==NULL) t->incomingFifo.inF=NULL; - struct one_thread *ptr=t->allthreads; - - while(ptr!=NULL) { - if (ptr->runningOnKey==key) { - ptr->abortFlag=true; + struct one_thread *thread = t->allthreads; + while (thread != NULL) { + if (thread->runningOnKey == key) { + thread->dropJob = true; nbRemoved++; } - ptr=ptr->next; + thread = thread->next; } mutexunlock(nf->lockF); return nbRemoved; } +static inline int abortTpool(tpool_t *t) { + int nbRemoved=0; + /* disables threading: if a message comes in now, we cannot have a race below + * as each thread will simply execute the message itself */ + t->activated = false; + notifiedFIFO_t *nf=&t->incomingFifo; + mutexlock(nf->lockF); + nf->abortFIFO = true; + notifiedFIFO_elt_t **start=&nf->outF; + + /* mark threads to abort them */ + struct one_thread *thread = t->allthreads; + while (thread != NULL) { + thread->dropJob = true; + thread->terminate = true; + nbRemoved++; + thread = thread->next; + } + + /* clear FIFOs */ + while(*start!=NULL) { + notifiedFIFO_elt_t **request=start; + *start=(*start)->next; + delNotifiedFIFO_elt(*request); + *request = NULL; + nbRemoved++; + } + + if (t->incomingFifo.outF==NULL) + t->incomingFifo.inF=NULL; + + condbroadcast(t->incomingFifo.notifF); + mutexunlock(nf->lockF); + + /* join threads that are still runing */ + thread = t->allthreads; + while (thread != NULL) { + pthread_cancel(thread->threadID); + thread = thread->next; + } + + return nbRemoved; +} void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name); #define initTpool(PARAMPTR,TPOOLPTR, MEASURFLAG) initNamedTpool(PARAMPTR,TPOOLPTR, MEASURFLAG, NULL) #endif diff --git a/executables/nr-gnb.c b/executables/nr-gnb.c index 8d9c6b1a7e6f97385f06eeedebef53f1727f12b1..3e195e0046fe37f3bba595586340948baf90c989 100644 --- a/executables/nr-gnb.c +++ b/executables/nr-gnb.c @@ -467,6 +467,11 @@ void init_gNB_Tpool(int inst) { } +void term_gNB_Tpool(int inst) { + PHY_VARS_gNB *gNB = RC.gNB[inst]; + abortTpool(&gNB->threadPool); +} + /*! * \brief Terminate gNB TX and RX threads. */ @@ -609,6 +614,7 @@ void init_gNB(int single_thread_flag,int wait_for_sync) { void stop_gNB(int nb_inst) { for (int inst=0; inst<nb_inst; inst++) { LOG_I(PHY,"Killing gNB %d processing threads\n",inst); + term_gNB_Tpool(inst); kill_gNB_proc(inst); } } diff --git a/executables/nr-ru.c b/executables/nr-ru.c index fe0f9fc783a141e3050c3010c969ee897dcfc001..31c555e61d867159e11230c0255209d6c43d494e 100644 --- a/executables/nr-ru.c +++ b/executables/nr-ru.c @@ -1271,13 +1271,6 @@ void *ru_thread( void *param ) { else LOG_I(PHY,"RU %d rf device stopped\n",ru->idx); } - res = pullNotifiedFIFO(&gNB->resp_L1); - delNotifiedFIFO_elt(res); - res = pullNotifiedFIFO(&gNB->L1_tx_free); - delNotifiedFIFO_elt(res); - res = pullNotifiedFIFO(&gNB->L1_tx_free); - delNotifiedFIFO_elt(res); - ru_thread_status = 0; return &ru_thread_status; }