From edb209f6e57bddc46e58cd9091e06b625f912cf7 Mon Sep 17 00:00:00 2001
From: Cedric Roux <cedric.roux@eurecom.fr>
Date: Sat, 1 Jul 2017 18:39:44 +0200
Subject: [PATCH] much better mobipass integration

with this commit I get very stable connection
over long period of time (more than 30 minutes)

I had 0 fake_rrh disconnet/reconnect messages.

I had very rarely timeouts on data from mobipass
(uplink).

Almost 0 nacks. I had some uplink nacks, which were
most certainly due to the timeouts above.

I was doing full udp DL (33Mb/s) and tcp UL (was going
around 15Mb/s) at the same time, with 1 UE.

Next step is to cleanup, remove globals, integrate properly
as a driver and put this in develop.
---
 targets/ARCH/mobipass/main.c     | 15 +++---
 targets/ARCH/mobipass/mobipass.c | 22 +++++---
 targets/ARCH/mobipass/mobipass.h |  3 ++
 targets/ARCH/mobipass/queues.c   | 91 ++++++++++++++++++++++++++------
 4 files changed, 103 insertions(+), 28 deletions(-)

diff --git a/targets/ARCH/mobipass/main.c b/targets/ARCH/mobipass/main.c
index 85150f81b..ed91b6657 100644
--- a/targets/ARCH/mobipass/main.c
+++ b/targets/ARCH/mobipass/main.c
@@ -27,7 +27,7 @@ struct mobipass_header {
   uint32_t timestamp;
 } __attribute__((__packed__));
 
-int trx_eth_start(openair0_device *device) { return 0;}
+int trx_eth_start(openair0_device *device) { init_mobipass(); return 0;}
 int trx_eth_request(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;}
 int trx_eth_reply(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;}
 int trx_eth_get_stats(openair0_device* device) { return(0); }
@@ -37,10 +37,12 @@ int trx_eth_stop(openair0_device *device) { return(0); }
 int trx_eth_set_freq(openair0_device* device, openair0_config_t *openair0_cfg,int exmimo_dump_config) { return(0); }
 int trx_eth_set_gains(openair0_device* device, openair0_config_t *openair0_cfg) { return(0); }
 
-int da__write(openair0_device *device, openair0_timestamp timestamp, void **buff, int nsamps, int cc, int flags) {
+int mobipass_write(openair0_device *device, openair0_timestamp timestamp, void **buff, int nsamps, int cc, int flags) {
   struct mobipass_header *mh = (struct mobipass_header *)(((char *)buff[0]) + 14);
   static uint32_t last_timestamp = 4*7680*2-640;
   last_timestamp += 640;
+  last_timestamp %= SAMPLES_PER_1024_FRAMES;
+  mh->timestamp = htonl(ntohl(mh->timestamp) % SAMPLES_PER_1024_FRAMES);
   if (last_timestamp != ntohl(mh->timestamp)) { printf("bad timestamp wanted %d got %d\n", last_timestamp, ntohl(mh->timestamp)); exit(1); }
 //printf("__write nsamps %d timestamps %ld seqno %d (packet timestamp %d)\n", nsamps, timestamp, mh->seqno, ntohl(mh->timestamp));
   if (nsamps != 640) abort();
@@ -48,12 +50,13 @@ int da__write(openair0_device *device, openair0_timestamp timestamp, void **buff
   return nsamps;
 }
 
-int da__read(openair0_device *device, openair0_timestamp *timestamp, void **buff, int nsamps, int cc) {
+int mobipass_read(openair0_device *device, openair0_timestamp *timestamp, void **buff, int nsamps, int cc) {
   static uint32_t ts = 0;
   static unsigned char seqno = 0;
 //printf("__read nsamps %d return timestamp %d\n", nsamps, ts);
   *timestamp = htonl(ts);
   ts += nsamps;
+  ts %= SAMPLES_PER_1024_FRAMES;
   if (nsamps != 640) { printf("bad nsamps %d, should be 640\n", nsamps); fflush(stdout); abort(); }
 
   dequeue_from_mobipass(ntohl(*timestamp), buff[0]);
@@ -74,7 +77,7 @@ int da__read(openair0_device *device, openair0_timestamp *timestamp, void **buff
 int transport_init(openair0_device *device, openair0_config_t *openair0_cfg,
         eth_params_t * eth_params )
 {
-  init_mobipass();
+  //init_mobipass();
 
   eth_state_t *eth = (eth_state_t*)malloc(sizeof(eth_state_t));
   memset(eth, 0, sizeof(eth_state_t));
@@ -96,8 +99,8 @@ int transport_init(openair0_device *device, openair0_config_t *openair0_cfg,
   device->trx_stop_func        = trx_eth_stop;
   device->trx_set_freq_func = trx_eth_set_freq;
   device->trx_set_gains_func = trx_eth_set_gains;
-  device->trx_write_func   = da__write;
-  device->trx_read_func    = da__read;
+  device->trx_write_func   = mobipass_write;
+  device->trx_read_func    = mobipass_read;
 
   eth->if_name = eth_params->local_if_name;
   device->priv = eth;
diff --git a/targets/ARCH/mobipass/mobipass.c b/targets/ARCH/mobipass/mobipass.c
index 9e143f50e..2a860a5db 100644
--- a/targets/ARCH/mobipass/mobipass.c
+++ b/targets/ARCH/mobipass/mobipass.c
@@ -11,6 +11,7 @@
 #include <pthread.h>
 
 #include "queues.h"
+#include "mobipass.h"
 
 /******************************************************************/
 /* time begin                                                     */
@@ -36,6 +37,11 @@ static void init_time(void)
 
 static void synch_time(uint32_t ts)
 {
+  static uint32_t last_ts = 0;
+  static uint64_t mega_ts = 0;
+  if (ts < last_ts) mega_ts++;
+  last_ts = ts;
+
   struct timespec now;
   if (clock_gettime(CLOCK_MONOTONIC_RAW, &now)) abort();
 
@@ -47,16 +53,16 @@ static void synch_time(uint32_t ts)
   /* 15360000 samples/second, in nanoseconds:
    *  = 15360000 / 1000000000 = 1536 / 100000 = 48 / 3125*/
 
-  uint64_t ts_ns = (uint64_t)ts * (uint64_t)3125 / (uint64_t)48;
+  uint64_t ts_ns = ((uint64_t)ts + mega_ts * (uint64_t)SAMPLES_PER_1024_FRAMES) * (uint64_t)3125 / (uint64_t)48;
 
-//printf("tnow %lu t0 %lu ts %u ts_ns %lu\n", tnow, t0, ts, ts_ns);
+//printf("tnow %lu t0 %lu ts %u cur %lu ts_ns %lu mega_ts %lu\n", tnow, t0, ts, cur, ts_ns, mega_ts);
   if (cur >= ts_ns) return;
 
   uint64_t delta = ts_ns - cur;
   /* don't sleep more than 1 ms */
   if (delta > 1000*1000) delta = 1000*1000;
   delta = delta/1000;
-printf("ts %u delta %lu\n", ts, delta);
+//printf("ts %u delta %lu\n", ts, delta);
   if (delta) usleep(delta);
 }
 
@@ -87,14 +93,15 @@ static void receive(int sock, unsigned char *b)
 {
   if (recv(sock, b, 14+14+1280, 0) != 14+14+1280) { perror("recv"); exit(1); }
   struct mobipass_header *mh = (struct mobipass_header *)(b+14);
-  mh->timestamp = htonl(ntohl(mh->timestamp)-45378/*40120*/);
+  mh->timestamp = htonl((ntohl(mh->timestamp)-45378/*40120*/) % SAMPLES_PER_1024_FRAMES);
+//printf("recv timestamp %u\n", ntohl(mh->timestamp));
 }
 
 void mobipass_send(void *data)
 {
 //printf("SEND seqno %d ts %d\n", seqno, ts);
   struct ethernet_header *eh = (struct ethernet_header *)data;
-  //struct mobipass_header *mh = (struct mobipass_header *)(data+14);
+//struct mobipass_header *mh = (struct mobipass_header *)(data+14);
 //printf("SEND seqno %d ts %d\n", mh->seqno, ntohl(mh->timestamp));
 
   eh->dst[0] = 0x00;
@@ -169,10 +176,12 @@ static void *receiver(void *_)
 
 void dosend(int sock, int seqno, uint32_t ts)
 {
-//printf("SEND seqno %d ts %d\n", seqno, ts);
   struct ethernet_header *eh = (struct ethernet_header *)packet;
   struct mobipass_header *mh = (struct mobipass_header *)(packet+14);
 
+  ts %= SAMPLES_PER_1024_FRAMES;
+//printf("SEND seqno %d ts %d\n", seqno, ts);
+
   eh->dst[0] = 0x00;
   eh->dst[1] = 0x21;
   eh->dst[2] = 0x5e;
@@ -220,6 +229,7 @@ void *sender(void *_)
     dosend(sock, seqno, ts);
     seqno++;
     ts += 640;
+    ts %= SAMPLES_PER_1024_FRAMES;
   }
 }
 
diff --git a/targets/ARCH/mobipass/mobipass.h b/targets/ARCH/mobipass/mobipass.h
index b02c80879..31f2c4b6d 100644
--- a/targets/ARCH/mobipass/mobipass.h
+++ b/targets/ARCH/mobipass/mobipass.h
@@ -4,4 +4,7 @@
 void mobipass_send(void *data);
 void init_mobipass(void);
 
+/* TODO: following variable only works for 10MHz */
+#define SAMPLES_PER_1024_FRAMES (7680*2*10*1024)
+
 #endif /* _MOBIPASS_H_ */
diff --git a/targets/ARCH/mobipass/queues.c b/targets/ARCH/mobipass/queues.c
index e17a5ff3a..9b95da807 100644
--- a/targets/ARCH/mobipass/queues.c
+++ b/targets/ARCH/mobipass/queues.c
@@ -1,10 +1,14 @@
 #include "queues.h"
+#include "mobipass.h"
 
 #include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <arpa/inet.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/time.h>
 
 #define QSIZE 10000
 
@@ -43,6 +47,11 @@ static void enqueue(void *data, struct queue *q)
   memcpy(q->buf[pos], data, 14+14+640*2);
   q->len++;
 
+//if (q == &from_mobipass) {
+//struct mobipass_header *mh = (struct mobipass_header *)((char*)data+14);
+//printf("recv timestamp %u in pos %d\n", ntohl(mh->timestamp), pos);
+//}
+
 done:
   if (pthread_cond_signal(&q->cond)) abort();
   if (pthread_mutex_unlock(&q->mutex)) abort();
@@ -70,10 +79,23 @@ void dequeue_to_mobipass(uint32_t timestamp, void *data)
 void enqueue_from_mobipass(void *data)
 {
   struct mobipass_header *mh = (struct mobipass_header *)((char*)data+14);
+//printf("from mobipass! timestamp %u seqno %d\n", ntohl(mh->timestamp), mh->seqno);
+  mh->timestamp = htonl(ntohl(mh->timestamp) % SAMPLES_PER_1024_FRAMES);
 //printf("from mobipass! timestamp %u seqno %d\n", ntohl(mh->timestamp), mh->seqno);
   enqueue(data, &from_mobipass);
 }
 
+static int cmp_timestamps(uint32_t a, uint32_t b)
+{
+  if (a == b) return 0;
+  if (a < b) {
+    if (b-a > SAMPLES_PER_1024_FRAMES/2) return 1;
+    return -1;
+  }
+  if (a-b > SAMPLES_PER_1024_FRAMES/2) return -1;
+  return 1;
+}
+
 /* to be called with lock on */
 static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
 {
@@ -82,19 +104,37 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
   struct mobipass_header *mh = NULL;
   uint32_t packet_timestamp = 0;
 
+uint32_t old_start = from_mobipass.start;
+uint32_t old_len = from_mobipass.len;
+b = from_mobipass.buf[from_mobipass.start];
+mh = (struct mobipass_header *)(b+14);
+uint32_t old_pts = from_mobipass.len ? ntohl(mh->timestamp) : -1;
+b=NULL;
+mh=NULL;
+
   while (from_mobipass.len) {
     b = from_mobipass.buf[from_mobipass.start];
     mh = (struct mobipass_header *)(b+14);
     data = b + 14*2;
     packet_timestamp = ntohl(mh->timestamp);
-    if (timestamp < packet_timestamp) goto nodata;
-    if (timestamp < packet_timestamp+640) break;
+//printf("cmp A %u pt %u start %d\n", timestamp, packet_timestamp, from_mobipass.start);
+    if (cmp_timestamps(timestamp, packet_timestamp) < 0) goto nodata;
+//printf("cmp B %u pt %u\n", timestamp, packet_timestamp);
+    if (cmp_timestamps(timestamp, (packet_timestamp+640) % SAMPLES_PER_1024_FRAMES) < 0) break;
+//printf("cmp C %u pt %u\n", timestamp, packet_timestamp);
     from_mobipass.len--;
     from_mobipass.start = (from_mobipass.start+1) % QSIZE;
   }
 
   if (from_mobipass.len == 0) goto nodata;
 
+  if (timestamp == (packet_timestamp + 639) % SAMPLES_PER_1024_FRAMES) {
+    from_mobipass.len--;
+    from_mobipass.start = (from_mobipass.start+1) % QSIZE;
+  }
+
+  if (timestamp < packet_timestamp) timestamp += SAMPLES_PER_1024_FRAMES;
+
   *I = data[(timestamp - packet_timestamp) * 2];
   *Q = data[(timestamp - packet_timestamp) * 2 + 1];
 
@@ -103,16 +143,31 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
 nodata:
   *I = 0;
   *Q = 0;
+printf("no sample timestamp %u pt %u start %d old_start %d old_pt %u len %d old len %d\n", timestamp, packet_timestamp, from_mobipass.start, old_start, old_pts, from_mobipass.len, old_len);
+}
+
+/* doesn't work with delay more than 1s */
+static void wait_for_data(pthread_cond_t *cond, pthread_mutex_t *mutex, int delay_us)
+{
+  struct timeval now;
+  struct timespec target;
+  gettimeofday(&now, NULL);
+  target.tv_sec = now.tv_sec;
+  target.tv_nsec = (now.tv_usec + delay_us) * 1000;
+  if (target.tv_nsec >= 1000 * 1000 * 1000) { target.tv_nsec -= 1000 * 1000 * 1000; target.tv_sec++; }
+  int err = pthread_cond_timedwait(cond, mutex, &target);
+  if (err != 0 && err != ETIMEDOUT) { printf("pthread_cond_timedwait: err (%d) %s\n", err, strerror(err)); abort(); }
 }
 
 void dequeue_from_mobipass(uint32_t timestamp, void *data)
 {
   int i;
-  int ts = timestamp;
-  int not_empty;
+//  int ts = timestamp;
+  int waiting_allowed;
 
-#if 0
   if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
+
+#if 0
 printf("want dequeue ts %u queue (start %d len %d): [", timestamp, from_mobipass.start, from_mobipass.len);
 for (i = 0; i < from_mobipass.len; i++) {
   unsigned char *b = NULL;
@@ -126,23 +181,27 @@ printf("]\n");
 #endif
 
   if (from_mobipass.len == 0) {
-    if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
-    usleep(1000/3);
-    if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
+    //if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
+printf("sleep 1\n");
+    //usleep(1000/3);
+    //if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
+    wait_for_data(&from_mobipass.cond, &from_mobipass.mutex, 2000); //1000/3);
   }
 
-  not_empty = from_mobipass.len != 0;
+  waiting_allowed = from_mobipass.len != 0;
 
   for (i = 0; i < 640*2; i+=2) {
-    if (from_mobipass.len == 0 && not_empty) {
-      if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
-      usleep(1000/3);
-      if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
-      not_empty = from_mobipass.len != 0;
+    if (from_mobipass.len == 0 && waiting_allowed) {
+      //if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
+//printf("sleep 2\n");
+      //usleep(1000/3);
+      //if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
+      wait_for_data(&from_mobipass.cond, &from_mobipass.mutex, 2000); //1000/3);
+      waiting_allowed = from_mobipass.len != 0;
     }
 
-    get_sample_from_mobipass((char*)data + 14*2 + i, (char*)data + 14*2 + i+1, ts);
-    ts++;
+    get_sample_from_mobipass((char*)data + 14*2 + i, (char*)data + 14*2 + i+1, timestamp % SAMPLES_PER_1024_FRAMES);
+    timestamp++;
   }
 
   if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
-- 
GitLab