From ab089f8718e0107276c302a775e2df67ce02c347 Mon Sep 17 00:00:00 2001
From: Bartosz Podrygajlo <bartosz.podrygajlo@openairinterface.org>
Date: Mon, 7 Oct 2024 13:40:01 +0200
Subject: [PATCH] Actor model imlementation

Add actor library which implements the Actor model (see  https://en.wikipedia.org/wiki/Actor_model).
---
 common/utils/CMakeLists.txt             |   1 +
 common/utils/actor/CMakeLists.txt       |   6 +
 common/utils/actor/README.md            |  13 +++
 common/utils/actor/actor.c              |  70 ++++++++++++
 common/utils/actor/actor.h              |  48 ++++++++
 common/utils/actor/tests/CMakeLists.txt |   5 +
 common/utils/actor/tests/test_actor.cpp | 144 ++++++++++++++++++++++++
 common/utils/threadPool/thread-pool.h   |   1 +
 8 files changed, 288 insertions(+)
 create mode 100644 common/utils/actor/CMakeLists.txt
 create mode 100644 common/utils/actor/README.md
 create mode 100644 common/utils/actor/actor.c
 create mode 100644 common/utils/actor/actor.h
 create mode 100644 common/utils/actor/tests/CMakeLists.txt
 create mode 100644 common/utils/actor/tests/test_actor.cpp

diff --git a/common/utils/CMakeLists.txt b/common/utils/CMakeLists.txt
index 050dda0dc78..72104516a26 100644
--- a/common/utils/CMakeLists.txt
+++ b/common/utils/CMakeLists.txt
@@ -17,3 +17,4 @@ add_library(utils utils.c system.c time_meas.c time_stat.c tun_if.c)
 target_include_directories(utils PUBLIC .)
 target_link_libraries(utils PRIVATE ${T_LIB})
 add_subdirectory(barrier)
+add_subdirectory(actor)
diff --git a/common/utils/actor/CMakeLists.txt b/common/utils/actor/CMakeLists.txt
new file mode 100644
index 00000000000..40057d347eb
--- /dev/null
+++ b/common/utils/actor/CMakeLists.txt
@@ -0,0 +1,6 @@
+add_library(actor actor.c)
+target_include_directories(actor PUBLIC ./)
+target_link_libraries(actor PUBLIC thread-pool)
+if (ENABLE_TESTS)
+  add_subdirectory(tests)
+endif()
diff --git a/common/utils/actor/README.md b/common/utils/actor/README.md
new file mode 100644
index 00000000000..13a577a7a1e
--- /dev/null
+++ b/common/utils/actor/README.md
@@ -0,0 +1,13 @@
+# Overview
+
+This is a simple actor implementation (see https://en.wikipedia.org/wiki/Actor_model).
+
+Actor is implemented as a single thread with single producer/consumer queue as input. You can think of it as a single thread threadpool
+
+If you need concurrency, consider allocating more than one actor.
+
+# Thread safety
+
+There is two ways to ensure thread safety between two actors
+ - Use core affinity to set both actor to run on the same core.
+ - Use data separation, like in testcase thread_safety_with_actor_specific_data
diff --git a/common/utils/actor/actor.c b/common/utils/actor/actor.c
new file mode 100644
index 00000000000..2f26b9af4f4
--- /dev/null
+++ b/common/utils/actor/actor.c
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The OpenAirInterface Software Alliance licenses this file to You under
+ * the OAI Public License, Version 1.1  (the "License"); you may not use this file
+ * except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.openairinterface.org/?page_id=698
+ *
+ * Author and copyright: Laurent Thomas, open-cells.com
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *-------------------------------------------------------------------------------
+ * For more information about the OpenAirInterface (OAI) Software Alliance:
+ *      contact@openairinterface.org
+ */
+#include "thread-pool.h"
+#include "actor.h"
+#include "system.h"
+#include "assertions.h"
+
+void *actor_thread(void *arg);
+
+void init_actor(Actor_t *actor, const char *name, int core_affinity)
+{
+  actor->terminate = false;
+  initNotifiedFIFO(&actor->fifo);
+  char actor_name[16];
+  snprintf(actor_name, sizeof(actor_name), "%s%s", name, "_actor");
+  threadCreate(&actor->thread, actor_thread, (void *)actor, actor_name, core_affinity, OAI_PRIORITY_RT_MAX);
+}
+
+/// @brief Main actor thread
+/// @param arg actor pointer
+/// @return NULL
+void *actor_thread(void *arg)
+{
+  Actor_t *actor = arg;
+
+  // Infinite loop to process requests
+  do {
+    notifiedFIFO_elt_t *elt = pullNotifiedFIFO(&actor->fifo);
+    if (elt == NULL) {
+      AssertFatal(actor->terminate, "pullNotifiedFIFO() returned NULL\n");
+      break;
+    }
+
+    elt->processingFunc(NotifiedFifoData(elt));
+    if (elt->reponseFifo) {
+      // Check if the job is still alive, else it has been aborted
+      pushNotifiedFIFO(elt->reponseFifo, elt);
+    } else
+      delNotifiedFIFO_elt(elt);
+  } while (!actor->terminate);
+
+  return NULL;
+}
+
+void destroy_actor(Actor_t *actor)
+{
+  actor->terminate = true;
+  abortNotifiedFIFO(&actor->fifo);
+  pthread_join(actor->thread, NULL);
+}
diff --git a/common/utils/actor/actor.h b/common/utils/actor/actor.h
new file mode 100644
index 00000000000..0265f01d985
--- /dev/null
+++ b/common/utils/actor/actor.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The OpenAirInterface Software Alliance licenses this file to You under
+ * the OAI Public License, Version 1.1  (the "License"); you may not use this file
+ * except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.openairinterface.org/?page_id=698
+ *
+ * Author and copyright: Laurent Thomas, open-cells.com
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *-------------------------------------------------------------------------------
+ * For more information about the OpenAirInterface (OAI) Software Alliance:
+ *      contact@openairinterface.org
+ */
+
+#ifndef ACTOR_H
+#define ACTOR_H
+#include "thread-pool.h"
+
+#define INIT_ACTOR(ptr, name, core_affinity) init_actor((Actor_t *)ptr, name, core_affinity);
+
+#define DESTROY_ACTOR(ptr) destroy_actor((Actor_t *)ptr);
+
+typedef struct Actor_t {
+  notifiedFIFO_t fifo;
+  bool terminate;
+  pthread_t thread;
+} Actor_t;
+
+/// @brief Initialize the actor. Starts actor thread
+/// @param actor
+/// @param name Actor name. Thread name will be derived from it
+/// @param core_affinity Core affinity. Specify -1 for no affinity
+void init_actor(Actor_t *actor, const char *name, int core_affinity);
+
+/// @brief Destroy the actor. Free the memory, stop the thread.
+/// @param actor
+void destroy_actor(Actor_t *actor);
+
+#endif
diff --git a/common/utils/actor/tests/CMakeLists.txt b/common/utils/actor/tests/CMakeLists.txt
new file mode 100644
index 00000000000..dd4a22a848a
--- /dev/null
+++ b/common/utils/actor/tests/CMakeLists.txt
@@ -0,0 +1,5 @@
+add_executable(test_actor test_actor.cpp)
+target_link_libraries(test_actor PRIVATE actor GTest::gtest thread-pool LOG)
+add_dependencies(tests test_actor)
+add_test(NAME test_actor
+  COMMAND ./test_actor)
diff --git a/common/utils/actor/tests/test_actor.cpp b/common/utils/actor/tests/test_actor.cpp
new file mode 100644
index 00000000000..926ff837056
--- /dev/null
+++ b/common/utils/actor/tests/test_actor.cpp
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The OpenAirInterface Software Alliance licenses this file to You under
+ * the OAI Public License, Version 1.1  (the "License"); you may not use this file
+ * except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.openairinterface.org/?page_id=698
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *-------------------------------------------------------------------------------
+ * For more information about the OpenAirInterface (OAI) Software Alliance:
+ *      contact@openairinterface.org
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <threads.h>
+#include <stdlib.h>
+#include <gtest/gtest.h>
+#include <unistd.h>
+#include "common/config/config_userapi.h"
+#include "log.h"
+
+extern "C" {
+#include "actor.h"
+configmodule_interface_t *uniqCfg;
+void exit_function(const char *file, const char *function, const int line, const char *s, const int assert)
+{
+  if (assert) {
+    abort();
+  } else {
+    exit(EXIT_SUCCESS);
+  }
+}
+}
+
+int num_processed = 0;
+
+void process(void *arg)
+{
+  num_processed++;
+}
+
+TEST(actor, schedule_one)
+{
+  Actor_t actor;
+  init_actor(&actor, "TEST", -1);
+  notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
+  pushNotifiedFIFO(&actor.fifo, elt);
+  usleep(10);
+  destroy_actor(&actor);
+  EXPECT_EQ(num_processed, 1);
+}
+
+TEST(actor, schedule_many)
+{
+  num_processed = 0;
+  Actor_t actor;
+  init_actor(&actor, "TEST", -1);
+  for (int i = 0; i < 100; i++) {
+    notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
+    pushNotifiedFIFO(&actor.fifo, elt);
+  }
+  int num_waits = 0;
+  while (num_processed < 100 && num_waits++ < 10)
+    usleep(10);
+  destroy_actor(&actor);
+  EXPECT_EQ(num_processed, 100);
+}
+
+// Thread safety can be ensured through core affinity - if two actors
+// are running on the same core they are thread safe
+TEST(DISABLED_actor, thread_safety_with_core_affinity)
+{
+  num_processed = 0;
+  int core = 0;
+  Actor_t actor;
+  init_actor(&actor, "TEST", core);
+  Actor_t actor2;
+  init_actor(&actor2, "TEST2", core);
+  for (int i = 0; i < 10000; i++) {
+    Actor_t *actor_ptr = i % 2 == 0 ? &actor : &actor2;
+    notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
+    pushNotifiedFIFO(&actor_ptr->fifo, elt);
+  }
+  int num_waits = 0;
+  while (num_processed < 10000 && num_waits++ < 1000)
+    usleep(10);
+  destroy_actor(&actor);
+  destroy_actor(&actor2);
+  EXPECT_EQ(num_processed, 10000);
+}
+
+// Thread safety can be ensured through data separation, here using C inheritance-like
+// model
+typedef struct TestActor_t {
+  Actor_t actor;
+  int count;
+} TestActor_t;
+
+void process_thread_safe(void *arg)
+{
+  TestActor_t *actor = static_cast<TestActor_t *>(arg);
+  actor->count += 1;
+}
+
+TEST(actor, thread_safety_with_actor_specific_data)
+{
+  num_processed = 0;
+  int core = 0;
+  TestActor_t actor;
+  INIT_ACTOR(&actor, "TEST", core);
+  actor.count = 0;
+  TestActor_t actor2;
+  INIT_ACTOR(&actor2, "TEST2", core);
+  actor2.count = 0;
+  for (int i = 0; i < 10000; i++) {
+    TestActor_t *actor_ptr = i % 2 == 0 ? &actor : &actor2;
+    notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process_thread_safe);
+    elt->msgData = actor_ptr;
+    pushNotifiedFIFO(&actor_ptr->actor.fifo, elt);
+  }
+  int num_waits = 0;
+  while (actor.count + actor2.count < 10000 && num_waits++ < 1000)
+    usleep(10);
+  DESTROY_ACTOR(&actor);
+  DESTROY_ACTOR(&actor2);
+  EXPECT_EQ(actor.count + actor2.count, 10000);
+}
+
+int main(int argc, char **argv)
+{
+  logInit();
+  g_log->log_component[UTIL].level = OAILOG_DEBUG;
+  testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/common/utils/threadPool/thread-pool.h b/common/utils/threadPool/thread-pool.h
index b0e1b9e4e88..31395d7a9b7 100644
--- a/common/utils/threadPool/thread-pool.h
+++ b/common/utils/threadPool/thread-pool.h
@@ -30,6 +30,7 @@
 #include <stdalign.h>
 #include <pthread.h>
 #include <unistd.h>
+#include <string.h>
 #include <sys/syscall.h>
 #include "assertions.h"
 #include "common/utils/time_meas.h"
-- 
GitLab