Skip to content
Snippets Groups Projects
Commit ab089f87 authored by Bartosz Podrygajlo's avatar Bartosz Podrygajlo
Browse files

Actor model imlementation

Add actor library which implements the Actor model (see  https://en.wikipedia.org/wiki/Actor_model).
parent 5ecf6da1
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !3025. Comments created here will be created in the context of that merge request.
......@@ -17,3 +17,4 @@ add_library(utils utils.c system.c time_meas.c time_stat.c tun_if.c)
target_include_directories(utils PUBLIC .)
target_link_libraries(utils PRIVATE ${T_LIB})
add_subdirectory(barrier)
add_subdirectory(actor)
add_library(actor actor.c)
target_include_directories(actor PUBLIC ./)
target_link_libraries(actor PUBLIC thread-pool)
if (ENABLE_TESTS)
add_subdirectory(tests)
endif()
# Overview
This is a simple actor implementation (see https://en.wikipedia.org/wiki/Actor_model).
Actor is implemented as a single thread with single producer/consumer queue as input. You can think of it as a single thread threadpool
If you need concurrency, consider allocating more than one actor.
# Thread safety
There is two ways to ensure thread safety between two actors
- Use core affinity to set both actor to run on the same core.
- Use data separation, like in testcase thread_safety_with_actor_specific_data
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Author and copyright: Laurent Thomas, open-cells.com
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#include "thread-pool.h"
#include "actor.h"
#include "system.h"
#include "assertions.h"
void *actor_thread(void *arg);
void init_actor(Actor_t *actor, const char *name, int core_affinity)
{
actor->terminate = false;
initNotifiedFIFO(&actor->fifo);
char actor_name[16];
snprintf(actor_name, sizeof(actor_name), "%s%s", name, "_actor");
threadCreate(&actor->thread, actor_thread, (void *)actor, actor_name, core_affinity, OAI_PRIORITY_RT_MAX);
}
/// @brief Main actor thread
/// @param arg actor pointer
/// @return NULL
void *actor_thread(void *arg)
{
Actor_t *actor = arg;
// Infinite loop to process requests
do {
notifiedFIFO_elt_t *elt = pullNotifiedFIFO(&actor->fifo);
if (elt == NULL) {
AssertFatal(actor->terminate, "pullNotifiedFIFO() returned NULL\n");
break;
}
elt->processingFunc(NotifiedFifoData(elt));
if (elt->reponseFifo) {
// Check if the job is still alive, else it has been aborted
pushNotifiedFIFO(elt->reponseFifo, elt);
} else
delNotifiedFIFO_elt(elt);
} while (!actor->terminate);
return NULL;
}
void destroy_actor(Actor_t *actor)
{
actor->terminate = true;
abortNotifiedFIFO(&actor->fifo);
pthread_join(actor->thread, NULL);
}
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Author and copyright: Laurent Thomas, open-cells.com
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef ACTOR_H
#define ACTOR_H
#include "thread-pool.h"
#define INIT_ACTOR(ptr, name, core_affinity) init_actor((Actor_t *)ptr, name, core_affinity);
#define DESTROY_ACTOR(ptr) destroy_actor((Actor_t *)ptr);
typedef struct Actor_t {
notifiedFIFO_t fifo;
bool terminate;
pthread_t thread;
} Actor_t;
/// @brief Initialize the actor. Starts actor thread
/// @param actor
/// @param name Actor name. Thread name will be derived from it
/// @param core_affinity Core affinity. Specify -1 for no affinity
void init_actor(Actor_t *actor, const char *name, int core_affinity);
/// @brief Destroy the actor. Free the memory, stop the thread.
/// @param actor
void destroy_actor(Actor_t *actor);
#endif
add_executable(test_actor test_actor.cpp)
target_link_libraries(test_actor PRIVATE actor GTest::gtest thread-pool LOG)
add_dependencies(tests test_actor)
add_test(NAME test_actor
COMMAND ./test_actor)
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#include <stdio.h>
#include <assert.h>
#include <threads.h>
#include <stdlib.h>
#include <gtest/gtest.h>
#include <unistd.h>
#include "common/config/config_userapi.h"
#include "log.h"
extern "C" {
#include "actor.h"
configmodule_interface_t *uniqCfg;
void exit_function(const char *file, const char *function, const int line, const char *s, const int assert)
{
if (assert) {
abort();
} else {
exit(EXIT_SUCCESS);
}
}
}
int num_processed = 0;
void process(void *arg)
{
num_processed++;
}
TEST(actor, schedule_one)
{
Actor_t actor;
init_actor(&actor, "TEST", -1);
notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
pushNotifiedFIFO(&actor.fifo, elt);
usleep(10);
destroy_actor(&actor);
EXPECT_EQ(num_processed, 1);
}
TEST(actor, schedule_many)
{
num_processed = 0;
Actor_t actor;
init_actor(&actor, "TEST", -1);
for (int i = 0; i < 100; i++) {
notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
pushNotifiedFIFO(&actor.fifo, elt);
}
int num_waits = 0;
while (num_processed < 100 && num_waits++ < 10)
usleep(10);
destroy_actor(&actor);
EXPECT_EQ(num_processed, 100);
}
// Thread safety can be ensured through core affinity - if two actors
// are running on the same core they are thread safe
TEST(DISABLED_actor, thread_safety_with_core_affinity)
{
num_processed = 0;
int core = 0;
Actor_t actor;
init_actor(&actor, "TEST", core);
Actor_t actor2;
init_actor(&actor2, "TEST2", core);
for (int i = 0; i < 10000; i++) {
Actor_t *actor_ptr = i % 2 == 0 ? &actor : &actor2;
notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process);
pushNotifiedFIFO(&actor_ptr->fifo, elt);
}
int num_waits = 0;
while (num_processed < 10000 && num_waits++ < 1000)
usleep(10);
destroy_actor(&actor);
destroy_actor(&actor2);
EXPECT_EQ(num_processed, 10000);
}
// Thread safety can be ensured through data separation, here using C inheritance-like
// model
typedef struct TestActor_t {
Actor_t actor;
int count;
} TestActor_t;
void process_thread_safe(void *arg)
{
TestActor_t *actor = static_cast<TestActor_t *>(arg);
actor->count += 1;
}
TEST(actor, thread_safety_with_actor_specific_data)
{
num_processed = 0;
int core = 0;
TestActor_t actor;
INIT_ACTOR(&actor, "TEST", core);
actor.count = 0;
TestActor_t actor2;
INIT_ACTOR(&actor2, "TEST2", core);
actor2.count = 0;
for (int i = 0; i < 10000; i++) {
TestActor_t *actor_ptr = i % 2 == 0 ? &actor : &actor2;
notifiedFIFO_elt_t *elt = newNotifiedFIFO_elt(0, 0, NULL, process_thread_safe);
elt->msgData = actor_ptr;
pushNotifiedFIFO(&actor_ptr->actor.fifo, elt);
}
int num_waits = 0;
while (actor.count + actor2.count < 10000 && num_waits++ < 1000)
usleep(10);
DESTROY_ACTOR(&actor);
DESTROY_ACTOR(&actor2);
EXPECT_EQ(actor.count + actor2.count, 10000);
}
int main(int argc, char **argv)
{
logInit();
g_log->log_component[UTIL].level = OAILOG_DEBUG;
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -30,6 +30,7 @@
#include <stdalign.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <sys/syscall.h>
#include "assertions.h"
#include "common/utils/time_meas.h"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment