From 34db5f2003120719f56c5f45626b64433fc9d184 Mon Sep 17 00:00:00 2001
From: Xenofon Foukas <x.foukas@sms.ed.ac.uk>
Date: Wed, 9 Mar 2016 12:19:16 +0000
Subject: [PATCH] Moved agent async interface to lock-free implementation using
 lock-free ringbuffer

---
 cmake_targets/CMakeLists.txt              |   1 +
 openair2/ENB_APP/enb_agent.c              |  36 +++---
 openair2/ENB_APP/enb_agent_async.c        |   6 +-
 openair2/UTIL/ASYNC_IF/link_manager.c     |  14 ++-
 openair2/UTIL/ASYNC_IF/link_manager.h     |   3 +-
 openair2/UTIL/ASYNC_IF/ringbuffer_queue.c | 143 ++++++++++++++++++++++
 openair2/UTIL/ASYNC_IF/ringbuffer_queue.h |  60 +++++++++
 7 files changed, 237 insertions(+), 26 deletions(-)
 create mode 100644 openair2/UTIL/ASYNC_IF/ringbuffer_queue.c
 create mode 100644 openair2/UTIL/ASYNC_IF/ringbuffer_queue.h

diff --git a/cmake_targets/CMakeLists.txt b/cmake_targets/CMakeLists.txt
index 18eaf0f8b6f..ea0cabe88f3 100644
--- a/cmake_targets/CMakeLists.txt
+++ b/cmake_targets/CMakeLists.txt
@@ -832,6 +832,7 @@ if (ENB_AGENT_SB_IF)
     ${OPENAIR2_DIR}/UTIL/ASYNC_IF/socket_link.c
     ${OPENAIR2_DIR}/UTIL/ASYNC_IF/link_manager.c
     ${OPENAIR2_DIR}/UTIL/ASYNC_IF/message_queue.c
+    ${OPENAIR2_DIR}/UTIL/ASYNC_IF/ringbuffer_queue.c
     )
   set(ASYNC_IF_LIB ASYNC_IF)
   include_directories(${OPENAIR2_DIR}/UTIL/ASYNC_IF)
diff --git a/openair2/ENB_APP/enb_agent.c b/openair2/ENB_APP/enb_agent.c
index a21e2f8e160..81d856fa976 100644
--- a/openair2/ENB_APP/enb_agent.c
+++ b/openair2/ENB_APP/enb_agent.c
@@ -135,32 +135,34 @@ void *receive_thread(void *args) {
   Protocol__ProgranMessage *msg;
   
   while (1) {
-    if (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority)) {
-      err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING;
-      goto error;
-    }
+    //if (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority)) {
+    //  err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING;
+    //  goto error;
+    //}
 
-    LOG_D(ENB_AGENT,"received message with size %d\n", size);
+    while (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority) == 0) {
+      
+      LOG_D(ENB_AGENT,"received message with size %d\n", size);
   
     
-    msg=enb_agent_handle_message(d->enb_id, data, size);
+      msg=enb_agent_handle_message(d->enb_id, data, size);
 
-    free(data);
+      free(data);
     
-    // check if there is something to send back to the controller
-    if (msg != NULL){
-      data=enb_agent_pack_message(msg,&size);
+      // check if there is something to send back to the controller
+      if (msg != NULL){
+	data=enb_agent_pack_message(msg,&size);
 
-      if (enb_agent_msg_send(d->enb_id, ENB_AGENT_DEFAULT, data, size, priority)) {
-	err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
-	goto error;
-      }
+	if (enb_agent_msg_send(d->enb_id, ENB_AGENT_DEFAULT, data, size, priority)) {
+	  err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
+	  goto error;
+	}
       
-      LOG_D(ENB_AGENT,"sent message with size %d\n", size);
+	LOG_D(ENB_AGENT,"sent message with size %d\n", size);
+      } 
     }
-    
   }
-
+    
   return NULL;
 
 error:
diff --git a/openair2/ENB_APP/enb_agent_async.c b/openair2/ENB_APP/enb_agent_async.c
index d6389ea65f8..a626566a8de 100644
--- a/openair2/ENB_APP/enb_agent_async.c
+++ b/openair2/ENB_APP/enb_agent_async.c
@@ -60,10 +60,10 @@ enb_agent_async_channel_t * enb_agent_async_channel_info(mid_t mod_id, char *dst
    /* 
    * create a message queue
    */ 
-  
-  channel->send_queue = new_message_queue();
+  // Set size of queues statically for now
+  channel->send_queue = new_message_queue(500);
   if (channel->send_queue == NULL) goto error;
-  channel->receive_queue = new_message_queue();
+  channel->receive_queue = new_message_queue(500);
   if (channel->receive_queue == NULL) goto error;
   
    /* 
diff --git a/openair2/UTIL/ASYNC_IF/link_manager.c b/openair2/UTIL/ASYNC_IF/link_manager.c
index 9b31fc043bd..a528577fd86 100644
--- a/openair2/UTIL/ASYNC_IF/link_manager.c
+++ b/openair2/UTIL/ASYNC_IF/link_manager.c
@@ -53,11 +53,15 @@ static void *link_manager_sender_thread(void *_manager)
   LOG_D(MAC, "starting link manager sender thread\n");
 
   while (manager->run) {
-    if (message_get(manager->send_queue, &data, &size, &priority))
-      goto error;
-    if (link_send_packet(manager->socket_link, data, size))
-      goto error;
-    free(data);
+    while (message_get(manager->send_queue, &data, &size, &priority) == 0) {
+      link_send_packet(manager->socket_link, data, size);
+      free(data);
+    }
+    //    if (message_get(manager->send_queue, &data, &size, &priority))
+    //  goto error;
+    //if (link_send_packet(manager->socket_link, data, size))
+    //  goto error;
+    //free(data);
   }
 
   LOG_D(MAC, "link manager sender thread quits\n");
diff --git a/openair2/UTIL/ASYNC_IF/link_manager.h b/openair2/UTIL/ASYNC_IF/link_manager.h
index 3e24b8679ab..4f0628c0ee4 100644
--- a/openair2/UTIL/ASYNC_IF/link_manager.h
+++ b/openair2/UTIL/ASYNC_IF/link_manager.h
@@ -38,7 +38,8 @@
 #ifndef LINK_MANAGER_H
 #define LINK_MANAGER_H
 
-#include "message_queue.h"
+//#include "message_queue.h"
+#include "ringbuffer_queue.h"
 #include "socket_link.h"
 
 #include <pthread.h>
diff --git a/openair2/UTIL/ASYNC_IF/ringbuffer_queue.c b/openair2/UTIL/ASYNC_IF/ringbuffer_queue.c
new file mode 100644
index 00000000000..0d63e41431a
--- /dev/null
+++ b/openair2/UTIL/ASYNC_IF/ringbuffer_queue.c
@@ -0,0 +1,143 @@
+/*******************************************************************************
+    OpenAirInterface
+    Copyright(c) 1999 - 2015 Eurecom
+
+    OpenAirInterface is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+
+    OpenAirInterface is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with OpenAirInterface.The full GNU General Public License is
+    included in this distribution in the file called "COPYING". If not,
+    see <http://www.gnu.org/licenses/>.
+
+  Contact Information
+  OpenAirInterface Admin: openair_admin@eurecom.fr
+  OpenAirInterface Tech : openair_tech@eurecom.fr
+  OpenAirInterface Dev  : openair4g-devel@eurecom.fr
+
+  Address      : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
+
+*******************************************************************************/
+/*! \file ringbuffer_queue.c
+ * \brief Lock-free ringbuffer used for async message passing of agent
+ * \author Xenofon Foukas
+ * \date March 2016
+ * \version 1.0
+ * \email: x.foukas@sms.ed.ac.uk
+ * @ingroup _mac
+ */
+
+#include "ringbuffer_queue.h"
+#include "log.h"
+
+message_queue_t * new_message_queue(int size) {
+  
+  message_queue_t *ret = NULL;
+
+  ret = calloc(1, sizeof(message_queue_t));
+  if (ret == NULL)
+    goto error;
+
+  lfds700_misc_library_init_valid_on_current_logical_core();
+  lfds700_misc_prng_init(&(ret->ps));
+  ret->ringbuffer_array = malloc(sizeof(struct lfds700_ringbuffer_element) * size);
+  lfds700_ringbuffer_init_valid_on_current_logical_core(&(ret->ringbuffer_state),
+							ret->ringbuffer_array,
+							size,
+							&(ret->ps),
+							NULL);
+
+  return ret;
+  
+ error:
+  LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
+  if (ret != NULL) {
+    free(ret->ringbuffer_array);
+    memset(ret, 0, sizeof(message_queue_t));
+    free(ret);
+  }
+  return NULL;
+}
+
+int message_put(message_queue_t *queue, void *data, int size, int priority) {
+
+  struct lfds700_misc_prng_state ls;
+  enum lfds700_misc_flag overwrite_occurred_flag;
+  message_t *overwritten_msg;
+  message_t *m = NULL;
+
+  LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
+  lfds700_misc_prng_init(&ls);
+  
+  m = calloc(1, sizeof(message_t));
+  if (m == NULL)
+    goto error;
+
+  m->data = data;
+  m->size = size;
+  m->priority = priority;
+
+  lfds700_ringbuffer_write(&(queue->ringbuffer_state),
+			   NULL,
+			   (void *) m,
+			   &overwrite_occurred_flag,
+			   NULL,
+			   (void **) &overwritten_msg,
+			   &ls);
+
+  if (overwrite_occurred_flag == LFDS700_MISC_FLAG_RAISED) {
+    free(overwritten_msg->data);
+    free(overwritten_msg);
+  }
+
+  return 0;
+
+ error:
+  free(m);
+  LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
+  return -1;
+}
+
+int message_get(message_queue_t *queue, void **data, int *size, int *priority) {
+  message_t *m;
+  struct lfds700_misc_prng_state ls;
+  
+  LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
+  lfds700_misc_prng_init(&ls);
+
+  if (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) == 0) {
+    return -1;
+  }
+
+  *data = m->data;
+  *size = m->size;
+  *priority = m->priority;
+  free(m);
+  return 0;
+}
+
+message_queue_t destroy_message_queue(message_queue_t *queue) {
+  struct lfds700_misc_prng_state ls;
+
+  message_t *m;
+
+  LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
+  lfds700_misc_prng_init(&ls);
+
+  while (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) != 0) {
+    free(m->data);
+    memset(m, 0, sizeof(message_t));
+    free(m);
+  }
+  free(queue->ringbuffer_array);
+  memset(queue, 0, sizeof(message_queue_t));
+  free(queue);
+}
diff --git a/openair2/UTIL/ASYNC_IF/ringbuffer_queue.h b/openair2/UTIL/ASYNC_IF/ringbuffer_queue.h
new file mode 100644
index 00000000000..756a35ae4a7
--- /dev/null
+++ b/openair2/UTIL/ASYNC_IF/ringbuffer_queue.h
@@ -0,0 +1,60 @@
+/*******************************************************************************
+    OpenAirInterface
+    Copyright(c) 1999 - 2015 Eurecom
+
+    OpenAirInterface is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+
+    OpenAirInterface is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with OpenAirInterface.The full GNU General Public License is
+    included in this distribution in the file called "COPYING". If not,
+    see <http://www.gnu.org/licenses/>.
+
+  Contact Information
+  OpenAirInterface Admin: openair_admin@eurecom.fr
+  OpenAirInterface Tech : openair_tech@eurecom.fr
+  OpenAirInterface Dev  : openair4g-devel@eurecom.fr
+
+  Address      : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
+
+*******************************************************************************/
+/*! \file ringbuffer_queue.h
+ * \brief Lock-free ringbuffer used for async message passing of agent
+ * \author Xenofon Foukas
+ * \date March 2016
+ * \version 1.0
+ * \email: x.foukas@sms.ed.ac.uk
+ * @ingroup _mac
+ */
+
+#ifndef RINGBUFFER_QUEUE_H
+#define RINGBUFFER_QUEUE_H
+
+#include "liblfds700.h"
+
+typedef struct message_s {
+  void *data;
+  int size;
+  int priority;
+} message_t;
+
+typedef struct {
+  struct lfds700_misc_prng_state ps;
+  struct lfds700_ringbuffer_element *ringbuffer_array;
+  struct lfds700_ringbuffer_state ringbuffer_state;
+} message_queue_t;
+
+message_queue_t * new_message_queue(int size);
+int message_put(message_queue_t *queue, void *data, int size, int priority);
+int message_get(message_queue_t *queue, void **data, int *size, int *priority);
+message_queue_t destroy_message_queue(message_queue_t *queue);
+
+#endif /* RINGBUFFER_QUEUE_H */
-- 
GitLab