Newer
Older
#/*
# * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The OpenAirInterface Software Alliance licenses this file to You under
# * the OAI Public License, Version 1.1 (the "License"); you may not use this file
# * except in compliance with the License.
# * You may obtain a copy of the License at
# *
# * http://www.openairinterface.org/?page_id=698
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# *-------------------------------------------------------------------------------
# * For more information about the OpenAirInterface (OAI) Software Alliance:
# * contact@openairinterface.org
# */
#---------------------------------------------------------------------
# Python for CI of OAI-eNB + COTS-UE
#
# Required Python Version
# Python 3.x
#
# Required Python Package
# pexpect
#---------------------------------------------------------------------
#-----------------------------------------------------------
# Version
#-----------------------------------------------------------
Version = '0.1'
#-----------------------------------------------------------
# Constants
#-----------------------------------------------------------
ALL_PROCESSES_OK = 0
ENB_PROCESS_FAILED = -1
ENB_PROCESS_OK = +1
ENB_PROCESS_SEG_FAULT = -11
ENB_PROCESS_ASSERTION = -12
ENB_PROCESS_REALTIME_ISSUE = -13
HSS_PROCESS_FAILED = -2
HSS_PROCESS_OK = +2
MME_PROCESS_FAILED = -3
MME_PROCESS_OK = +3
SPGW_PROCESS_FAILED = -4
SPGW_PROCESS_OK = +4
UE_IP_ADDRESS_ISSUE = -5
#-----------------------------------------------------------
# Import
#-----------------------------------------------------------
import sys # arg
import re # reg
import pexpect # pexpect
import time # sleep
import os
import xml.etree.ElementTree as ET
import logging
import datetime
import signal
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] %(name)s:%(levelname)s: %(message)s"
)
#-----------------------------------------------------------
# Class Declaration
#-----------------------------------------------------------
class SSHConnection():
def __init__(self):
self.eNBIPAddress = ''
self.eNBRepository = ''
self.eNBBranch = ''
self.eNBCommitID = ''
self.eNBTargetBranch = ''
self.eNBUserName = ''
self.eNBPassword = ''
self.eNBSourceCodePath = ''
self.EPCIPAddress = ''
self.EPCUserName = ''
self.EPCPassword = ''
self.EPCSourceCodePath = ''
self.EPCType = ''
self.ADBIPAddress = ''
self.ADBUserName = ''
self.ADBPassword = ''
self.testCase_id = ''
self.desc = ''
self.Build_eNB_args = ''
self.Initialize_eNB_args = ''
self.ping_args = ''
self.ping_packetloss_threshold = ''
self.iperf_args = ''
self.iperf_packetloss_threshold = ''
self.UEDevices = []
self.UEIPAddresses = []
self.htmlFile = ''
self.htmlHeaderCreated = False
self.htmlFooterCreated = False
self.htmlUEConnected = 0
self.htmleNBFailureMsg = ''
self.picocom_closure = False
def open(self, ipaddress, username, password):
count = 0
connect_status = False
while count < 4:
self.ssh = pexpect.spawn('ssh', [username + '@' + ipaddress], timeout = 5)
self.sshresponse = self.ssh.expect(['Are you sure you want to continue connecting (yes/no)?', 'password:', 'Last login', pexpect.EOF, pexpect.TIMEOUT])
if self.sshresponse == 0:
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
self.ssh.sendline('yes')
self.ssh.expect('password:')
self.ssh.sendline(password)
self.sshresponse = self.ssh.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if self.sshresponse == 0:
count = 10
connect_status = True
else:
logging.debug('self.sshresponse = ' + str(self.sshresponse))
elif self.sshresponse == 1:
self.ssh.sendline(password)
self.sshresponse = self.ssh.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if self.sshresponse == 0:
count = 10
connect_status = True
else:
logging.debug('self.sshresponse = ' + str(self.sshresponse))
elif self.sshresponse == 2:
# Checking if we are really on the remote client defined by its IP address
self.command('stdbuf -o0 ifconfig | egrep --color=never "inet addr:"', '\$', 5)
result = re.search(str(ipaddress), str(self.ssh.before))
if result is None:
self.close()
else:
count = 10
connect_status = True
# debug output
logging.debug(str(self.ssh.before))
logging.debug('self.sshresponse = ' + str(self.sshresponse))
# adding a tempo when failure
if not connect_status:
time.sleep(1)
count += 1
if connect_status:
sys.exit('SSH Connection Failed')
def command(self, commandline, expectedline, timeout):
logging.debug(commandline)
self.ssh.timeout = timeout
self.ssh.sendline(commandline)
self.sshresponse = self.ssh.expect([expectedline, pexpect.EOF, pexpect.TIMEOUT])
if self.sshresponse == 0:
elif self.sshresponse == 1:
logging.debug('\u001B[1;37;41m Unexpected EOF \u001B[0m')
logging.debug('Expected Line : ' + expectedline)
sys.exit(self.sshresponse)
elif self.sshresponse == 2:
logging.debug('\u001B[1;37;41m Unexpected TIMEOUT \u001B[0m')
logging.debug('Expected Line : ' + expectedline)
result = re.search('ping |iperf |picocom', str(commandline))
logging.debug(str(self.ssh.before))
else:
logging.debug('\u001B[1;37;41m Unexpected Others \u001B[0m')
logging.debug('Expected Line : ' + expectedline)
sys.exit(self.sshresponse)
def close(self):
self.ssh.timeout = 5
self.ssh.sendline('exit')
self.sshresponse = self.ssh.expect([pexpect.EOF, pexpect.TIMEOUT])
if self.sshresponse == 0:
pass
elif self.sshresponse == 1:
if not self.picocom_closure:
logging.debug('\u001B[1;37;41m Unexpected TIMEOUT during closing\u001B[0m')
logging.debug('\u001B[1;37;41m Unexpected Others during closing\u001B[0m')
def copyin(self, ipaddress, username, password, source, destination):
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
logging.debug('scp '+ username + '@' + ipaddress + ':' + source + ' ' + destination)
scp_spawn = pexpect.spawn('scp '+ username + '@' + ipaddress + ':' + source + ' ' + destination, timeout = 5)
scp_response = scp_spawn.expect(['Are you sure you want to continue connecting (yes/no)?', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0:
scp_spawn.sendline('yes')
scp_spawn.expect('password:')
scp_spawn.sendline(password)
scp_response = scp_spawn.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0:
pass
else:
logging.debug('1 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
elif scp_response == 1:
scp_spawn.sendline(password)
scp_response = scp_spawn.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0 or scp_response == 3:
pass
else:
logging.debug('2 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
elif scp_response == 2:
pass
else:
logging.debug('3 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def copyout(self, ipaddress, username, password, source, destination):
logging.debug('scp ' + source + ' ' + username + '@' + ipaddress + ':' + destination)
scp_spawn = pexpect.spawn('scp ' + source + ' ' + username + '@' + ipaddress + ':' + destination, timeout = 5)
scp_response = scp_spawn.expect(['Are you sure you want to continue connecting (yes/no)?', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0:
scp_spawn.sendline('yes')
scp_spawn.expect('password:')
scp_spawn.sendline(password)
scp_response = scp_spawn.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0:
pass
else:
logging.debug('1 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
elif scp_response == 1:
scp_spawn.sendline(password)
scp_response = scp_spawn.expect(['\$', 'Permission denied', 'password:', pexpect.EOF, pexpect.TIMEOUT])
if scp_response == 0 or scp_response == 3:
pass
else:
logging.debug('2 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
elif scp_response == 2:
pass
else:
logging.debug('3 - scp_response = ' + str(scp_response))
sys.exit('SCP failed')
def BuildeNB(self):
if self.eNBIPAddress == '' or self.eNBRepository == '' or self.eNBBranch == '' or self.eNBUserName == '' or self.eNBPassword == '' or self.eNBSourceCodePath == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('mkdir -p ' + self.eNBSourceCodePath, '\$', 5)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
self.command('if [ ! -e .git ]; then stdbuf -o0 git clone ' + self.eNBRepository + ' .; else stdbuf -o0 git fetch; fi', '\$', 600)
# Raphael: here add a check if git clone or git fetch went smoothly
self.command('git config user.email "jenkins@openairinterface.org"', '\$', 5)
self.command('git config user.name "OAI Jenkins"', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S git clean -x -d -ff', '\$', 30)
# if the commit ID is provided use it to point to it
if self.eNBCommitID != '':
self.command('git checkout -f ' + self.eNBCommitID, '\$', 5)
# if the branch is not develop, then it is a merge request and we need to do
# the potential merge. Note that merge conflicts should already been checked earlier
if self.eNBTargetBranch == '':
if (self.eNBBranch != 'develop') and (self.eNBBranch != 'origin/develop'):
self.command('git merge --ff origin/develop -m "Temporary merge for CI"', '\$', 5)
else:
logging.debug('Merging with the target branch: ' + self.eNBTargetBranch)
self.command('git merge --ff origin/' + self.eNBTargetBranch + ' -m "Temporary merge for CI"', '\$', 5)
self.command('source oaienv', '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('mkdir -p log', '\$', 5)
self.command('chmod 777 log', '\$', 5)
# no need to remove in log (git clean did the trick)
self.command('stdbuf -o0 ./build_oai ' + self.Build_eNB_args + ' 2>&1 | stdbuf -o0 tee -a compile_oai_enb.log', 'Bypassing the Tests', 600)
self.command('mkdir -p build_log_' + SSH.testCase_id, '\$', 5)
self.command('mv log/* ' + 'build_log_' + SSH.testCase_id, '\$', 5)
self.command('mv compile_oai_enb.log ' + 'build_log_' + SSH.testCase_id, '\$', 5)
# Workaround to run with develop-nr
self.command('if [ -e ran_build ]; then cp -rf ran_build lte_build_oai; fi', '\$', 30)
self.CreateHtmlTestRow(self.Build_eNB_args, 'OK', ALL_PROCESSES_OK)
def InitializeHSS(self):
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '' or self.EPCSourceCodePath == '' or self.EPCType == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
logging.debug('Using the OAI EPC HSS')
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('source oaienv', '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./run_hss 2>&1 | stdbuf -o0 awk \'{ print strftime("[%Y/%m/%d %H:%M:%S] ",systime()) $0 }\' | stdbuf -o0 tee -a hss_' + SSH.testCase_id + '.log &', 'Core state: 2 -> 3', 35)
else:
logging.debug('Using the ltebox simulated HSS')
self.command('if [ -d ' + self.EPCSourceCodePath + '/scripts ]; then echo ' + self.eNBPassword + ' | sudo -S rm -Rf ' + self.EPCSourceCodePath + '/scripts ; fi', '\$', 5)
self.command('mkdir -p ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
self.command('cd /opt/hss_sim0609', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S rm -f hss.log daemon.log', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S echo "Starting sudo session" && sudo daemon --unsafe --name=simulated_hss --chdir=/opt/hss_sim0609 ./starthss_real ', '\$', 5)
self.close()
self.CreateHtmlTestRow(self.EPCType, 'OK', ALL_PROCESSES_OK)
def InitializeMME(self):
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '' or self.EPCSourceCodePath == '' or self.EPCType == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('source oaienv', '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('stdbuf -o0 hostname', '\$', 5)
result = re.search('hostname\\\\r\\\\n(?P<host_name>[a-zA-Z0-9\-\_]+)\\\\r\\\\n', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m Hostname Not Found! \u001B[0m')
sys.exit(1)
host_name = result.group('host_name')
self.command('echo ' + self.EPCPassword + ' | sudo -S ./run_mme 2>&1 | stdbuf -o0 tee -a mme_' + SSH.testCase_id + '.log &', 'MME app initialization complete', 100)
else:
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./start_mme', '\$', 5)
self.close()
self.CreateHtmlTestRow(self.EPCType, 'OK', ALL_PROCESSES_OK)
def InitializeSPGW(self):
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '' or self.EPCSourceCodePath == '' or self.EPCType == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('source oaienv', '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./run_spgw 2>&1 | stdbuf -o0 tee -a spgw_' + SSH.testCase_id + '.log &', 'Initializing SPGW-APP task interface: DONE', 30)
else:
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./start_xGw', '\$', 5)
self.close()
self.CreateHtmlTestRow(self.EPCType, 'OK', ALL_PROCESSES_OK)
def InitializeeNB(self):
if self.eNBIPAddress == '' or self.eNBUserName == '' or self.eNBPassword == '' or self.eNBSourceCodePath == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = True
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow(self.Initialize_eNB_args, 'KO', pStatus)
# If tracer options is on, running tshark on EPC side and capture traffic b/ EPC and eNB
result = re.search('T_stdout', str(self.Initialize_eNB_args))
if result is not None:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('ip addr show | awk -f /tmp/active_net_interfaces.awk | egrep -v "lo|tun"', '\$', 5)
result = re.search('interfaceToUse=(?P<eth_interface>[a-zA-Z0-9\-\_]+)done', str(self.ssh.before))
if result is not None:
eth_interface = result.group('eth_interface')
logging.debug('\u001B[1m Launching tshark on interface ' + eth_interface + '\u001B[0m')
self.command('echo ' + self.EPCPassword + ' | sudo -S rm -f /tmp/enb_' + SSH.testCase_id + '_s1log.pcap', '\$', 5)
self.command('echo $USER; nohup sudo tshark -f "host ' + self.eNBIPAddress +'" -i ' + eth_interface + ' -w /tmp/enb_' + SSH.testCase_id + '_s1log.pcap > /tmp/tshark.log 2>&1 &', self.EPCUserName, 5)
self.close()
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
# Initialize_eNB_args usually start with -O and followed by the location in repository
full_config_file = self.Initialize_eNB_args.replace('-O ','')
extIdx = full_config_file.find('.conf')
if (extIdx > 0):
extra_options = full_config_file[extIdx + 5:]
# if tracer options is on, compiling and running T Tracer
result = re.search('T_stdout', str(extra_options))
if result is not None:
logging.debug('\u001B[1m Compiling and launching T Tracer\u001B[0m')
self.command('cd common/utils/T/tracer', '\$', 5)
self.command('make', '\$', 10)
self.command('echo $USER; nohup ./record -d ../T_messages.txt -o ' + self.eNBSourceCodePath + '/cmake_targets/enb_' + SSH.testCase_id + '_record.raw -ON -off VCD -off HEAVY -off LEGACY_GROUP_TRACE -off LEGACY_GROUP_DEBUG > ' + self.eNBSourceCodePath + '/cmake_targets/enb_' + SSH.testCase_id + '_record.log 2>&1 &', self.eNBUserName, 5)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
full_config_file = full_config_file[:extIdx + 5]
config_path, config_file = os.path.split(full_config_file)
else:
sys.exit('Insufficient Parameter')
ci_full_config_file = config_path + '/ci-' + config_file
rruCheck = False
result = re.search('rru', str(config_file))
if result is not None:
rruCheck = True
# Make a copy and adapt to EPC / eNB IP addresses
self.command('cp ' + full_config_file + ' ' + ci_full_config_file, '\$', 5)
self.command('sed -i -e \'s/CI_MME_IP_ADDR/' + self.EPCIPAddress + '/\' ' + ci_full_config_file, '\$', 2);
self.command('sed -i -e \'s/CI_ENB_IP_ADDR/' + self.eNBIPAddress + '/\' ' + ci_full_config_file, '\$', 2);
# Launch eNB with the modified config file
self.command('source oaienv', '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('echo "ulimit -c unlimited && ./lte_build_oai/build/lte-softmodem -O ' + self.eNBSourceCodePath + '/' + ci_full_config_file + extra_options + '" > ./my-lte-softmodem-run' + str(SSH.eNB_instance) + '.sh ', '\$', 5)
self.command('chmod 775 ./my-lte-softmodem-run' + str(SSH.eNB_instance) + '.sh ', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm -Rf enb_' + SSH.testCase_id + '.log', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S -E daemon --inherit --unsafe --name=enb' + str(SSH.eNB_instance) + '_daemon --chdir=' + self.eNBSourceCodePath + '/cmake_targets -o ' + self.eNBSourceCodePath + '/cmake_targets/enb_' + SSH.testCase_id + '.log ./my-lte-softmodem-run' + str(SSH.eNB_instance) + '.sh', '\$', 5)
if not rruCheck:
self.eNBLogFile = 'enb_' + SSH.testCase_id + '.log'
time.sleep(6)
doLoop = True
loopCounter = 10
while (doLoop):
loopCounter = loopCounter - 1
if (loopCounter == 0):
doLoop = False
logging.error('\u001B[1;37;41m eNB logging system did not show got sync! \u001B[0m')
self.CreateHtmlTestRow('-O ' + config_file + extra_options, 'KO', ALL_PROCESSES_OK)
self.command('stdbuf -o0 cat enb_' + SSH.testCase_id + '.log | egrep --color=never -i "wait|sync"', '\$', 4)
if rruCheck:
result = re.search('wait RUs', str(self.ssh.before))
else:
result = re.search('got sync', str(self.ssh.before))
if result is None:
time.sleep(6)
else:
doLoop = False
self.CreateHtmlTestRow('-O ' + config_file + extra_options, 'OK', ALL_PROCESSES_OK)
logging.debug('\u001B[1m Initialize eNB Completed\u001B[0m')
self.close()
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def InitializeUE_common(self, device_id):
logging.debug('send adb commands')
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
# The following commands are deprecated since we no longer work on Android 7+
# self.command('stdbuf -o0 adb -s ' + device_id + ' shell settings put global airplane_mode_on 1', '\$', 10)
# self.command('stdbuf -o0 adb -s ' + device_id + ' shell am broadcast -a android.intent.action.AIRPLANE_MODE --ez state true', '\$', 60)
# a dedicated script has to be installed inside the UE
# airplane mode on means call /data/local/tmp/off
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/off', '\$', 60)
#airplane mode off means call /data/local/tmp/on
logging.debug('\u001B[1mUE (' + device_id + ') Initialize Completed\u001B[0m')
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def InitializeUE(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
multi_jobs = []
for device_id in self.UEDevices:
p = Process(target = SSH.InitializeUE_common, args = (device_id,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def checkDevTTYisUnlocked(self):
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
count = 0
while count < 5:
self.command('echo ' + self.ADBPassword + ' | sudo -S lsof | grep ttyUSB0', '\$', 10)
result = re.search('picocom', str(self.ssh.before))
if result is None:
count = 10
else:
time.sleep(5)
count = count + 1
self.close()
def InitializeCatM(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.picocom_closure = True
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
# dummy call to start a sudo session. The picocom command does NOT handle well the `sudo -S`
self.command('echo ' + self.ADBPassword + ' | sudo -S ls', '\$', 10)
self.command('sudo picocom --baud 921600 --flow n --databits 8 /dev/ttyUSB0', 'Terminal ready', 10)
time.sleep(1)
# Calling twice AT to clear all buffers
self.command('AT', 'OK|ERROR', 5)
self.command('AT', 'OK', 5)
# Disabling the Radio
self.command('AT+CFUN=0', 'OK', 5)
logging.debug('\u001B[1m Cellular Functionality disabled\u001B[0m')
# Checking if auto-attach is enabled
self.command('AT^AUTOATT?', 'OK', 5)
result = re.search('AUTOATT: (?P<state>[0-9\-]+)', str(self.ssh.before))
if result is not None:
if result.group('state') is not None:
autoAttachState = int(result.group('state'))
if autoAttachState is not None:
if autoAttachState == 0:
self.command('AT^AUTOATT=1', 'OK', 5)
logging.debug('\u001B[1m Auto-Attach enabled\u001B[0m')
else:
logging.debug('\u001B[1;37;41m Could not check Auto-Attach! \u001B[0m')
# Force closure of picocom but device might still be locked
self.close()
self.picocom_closure = False
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
self.checkDevTTYisUnlocked()
def TerminateCatM(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.picocom_closure = True
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
# dummy call to start a sudo session. The picocom command does NOT handle well the `sudo -S`
self.command('echo ' + self.ADBPassword + ' | sudo -S ls', '\$', 10)
self.command('sudo picocom --baud 921600 --flow n --databits 8 /dev/ttyUSB0', 'Terminal ready', 10)
time.sleep(1)
# Calling twice AT to clear all buffers
self.command('AT', 'OK|ERROR', 5)
self.command('AT', 'OK', 5)
# Disabling the Radio
self.command('AT+CFUN=0', 'OK', 5)
logging.debug('\u001B[1m Cellular Functionality disabled\u001B[0m')
self.close()
self.picocom_closure = False
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
self.checkDevTTYisUnlocked()
def AttachCatM(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.picocom_closure = True
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
# dummy call to start a sudo session. The picocom command does NOT handle well the `sudo -S`
self.command('echo ' + self.ADBPassword + ' | sudo -S ls', '\$', 10)
self.command('sudo picocom --baud 921600 --flow n --databits 8 /dev/ttyUSB0', 'Terminal ready', 10)
time.sleep(1)
# Calling twice AT to clear all buffers
self.command('AT', 'OK|ERROR', 5)
self.command('AT', 'OK', 5)
# Enabling the Radio
self.command('AT+CFUN=1', 'SIMSTORE,READY', 5)
logging.debug('\u001B[1m Cellular Functionality enabled\u001B[0m')
time.sleep(4)
# We should check if we register
count = 0
while count < 3:
self.command('AT+CEREG?', 'OK', 5)
result = re.search('CEREG: 2,(?P<state>[0-9\-]+)', str(self.ssh.before))
if result is not None:
mDataConnectionState = int(result.group('state'))
if mDataConnectionState is not None:
logging.debug('+CEREG: 2,' + str(mDataConnectionState))
else:
logging.debug(str(self.ssh.before))
count = count + 1
time.sleep(1)
self.close()
self.picocom_closure = False
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
self.checkDevTTYisUnlocked()
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/on', '\$', 60)
time.sleep(2)
while count > 0:
self.command('stdbuf -o0 adb -s ' + device_id + ' shell dumpsys telephony.registry | grep mDataConnectionState', '\$', 15)
result = re.search('mDataConnectionState.*=(?P<state>[0-9\-]+)', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m mDataConnectionState Not Found! \u001B[0m')
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put('mDataConnectionState Not Found!')
lock.release()
break
mDataConnectionState = int(result.group('state'))
if mDataConnectionState == 2:
logging.debug('\u001B[1mUE (' + device_id + ') Attach Completed\u001B[0m')
lock.acquire()
statusQueue.put(max_count - count)
statusQueue.put(device_id)
statusQueue.put('Attach Completed')
lock.release()
break
count = count - 1
if count == 15 or count == 30:
logging.debug('\u001B[1;30;43m Retry UE (' + device_id + ') Flight Mode Off \u001B[0m')
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/off', '\$', 60)
time.sleep(0.5)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/on', '\$', 60)
time.sleep(0.5)
logging.debug('\u001B[1mWait UE (' + device_id + ') a second until mDataConnectionState=2 (' + str(max_count-count) + ' times)\u001B[0m')
time.sleep(1)
if count == 0:
logging.debug('\u001B[1;37;41m UE (' + device_id + ') Attach Failed \u001B[0m')
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put('Attach Failed')
lock.release()
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def AttachUE(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = False
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow('N/A', 'KO', pStatus)
for device_id in self.UEDevices:
if (self.nbMaxUEtoAttach == -1) or (nb_ue_to_connect < self.nbMaxUEtoAttach):
p = Process(target = SSH.AttachUE_common, args = (device_id, status_queue, lock,))
p.daemon = True
p.start()
multi_jobs.append(p)
nb_ue_to_connect = nb_ue_to_connect + 1
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'KO', ALL_PROCESSES_OK)
sys.exit(1)
else:
attach_status = True
html_queue = SimpleQueue()
while (not status_queue.empty()):
count = status_queue.get()
if (count < 0):
attach_status = False
device_id = status_queue.get()
message = status_queue.get()
if (count < 0):
html_cell = "<pre>UE (" + device_id + ")\n" + message + "</pre>"
else:
html_cell = "<pre>UE (" + device_id + ")\n" + message + ' in ' + str(count + 2) + ' seconds</pre>'
html_queue.put(html_cell)
if (attach_status):
self.CreateHtmlTestRowQueue('N/A', 'OK', len(self.UEDevices), html_queue)
else:
self.CreateHtmlTestRowQueue('N/A', 'KO', len(self.UEDevices), html_queue)
def DetachUE_common(self, device_id):
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/off', '\$', 60)
logging.debug('\u001B[1mUE (' + device_id + ') Detach Completed\u001B[0m')
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def DetachUE(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = False
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow('N/A', 'KO', pStatus)
multi_jobs = []
for device_id in self.UEDevices:
p = Process(target = SSH.DetachUE_common, args = (device_id,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
def RebootUE_common(self, device_id):
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
previousmDataConnectionStates = []
# Save mDataConnectionState
self.command('stdbuf -o0 adb -s ' + device_id + ' shell dumpsys telephony.registry | grep mDataConnectionState', '\$', 15)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell dumpsys telephony.registry | grep mDataConnectionState', '\$', 15)
result = re.search('mDataConnectionState.*=(?P<state>[0-9\-]+)', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m mDataConnectionState Not Found! \u001B[0m')
sys.exit(1)
previousmDataConnectionStates.append(int(result.group('state')))
# Reboot UE
self.command('stdbuf -o0 adb -s ' + device_id + ' shell reboot', '\$', 10)
time.sleep(60)
previousmDataConnectionState = previousmDataConnectionStates.pop(0)
count = 180
while count > 0:
count = count - 1
self.command('stdbuf -o0 adb -s ' + device_id + ' shell dumpsys telephony.registry | grep mDataConnectionState', '\$', 15)
result = re.search('mDataConnectionState.*=(?P<state>[0-9\-]+)', str(self.ssh.before))
if result is None:
mDataConnectionState = None
else:
mDataConnectionState = int(result.group('state'))
logging.debug('mDataConnectionState = ' + result.group('state'))
if mDataConnectionState is None or (previousmDataConnectionState == 2 and mDataConnectionState != 2):
logging.debug('\u001B[1mWait UE (' + device_id + ') a second until reboot completion (' + str(180-count) + ' times)\u001B[0m')
time.sleep(1)
else:
logging.debug('\u001B[1mUE (' + device_id + ') Reboot Completed\u001B[0m')
break
if count == 0:
logging.debug('\u001B[1;37;41m UE (' + device_id + ') Reboot Failed \u001B[0m')
sys.exit(1)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def RebootUE(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = False
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow('N/A', 'KO', pStatus)
multi_jobs = []
for device_id in self.UEDevices:
p = Process(target = SSH.RebootUE_common, args = (device_id,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def GetAllUEDevices(self, terminate_ue_flag):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('adb devices', '\$', 15)
self.UEDevices = re.findall("\\\\r\\\\n([A-Za-z0-9]+)\\\\tdevice",str(self.ssh.before))
if terminate_ue_flag == False:
if len(self.UEDevices) == 0:
logging.debug('\u001B[1;37;41m UE Not Found! \u001B[0m')
sys.exit(1)
self.close()
def GetAllCatMDevices(self, terminate_ue_flag):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('lsusb | egrep "Future Technology Devices International, Ltd FT2232C" | sed -e "s#:.*##" -e "s# #_#g"', '\$', 15)
self.CatMDevices = re.findall("\\\\r\\\\n([A-Za-z0-9_]+)",str(self.ssh.before))
if terminate_ue_flag == False:
if len(self.CatMDevices) == 0:
logging.debug('\u001B[1;37;41m CAT-M UE Not Found! \u001B[0m')
sys.exit(1)
self.close()
def GetAllUEIPAddresses(self):
if self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.UEIPAddresses = []
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
for device_id in self.UEDevices:
count = 0
while count < 4:
self.command('stdbuf -o0 adb -s ' + device_id + ' shell ip addr show | grep rmnet', '\$', 15)
result = re.search('inet (?P<ueipaddress>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)\/[0-9]+[0-9a-zA-Z\.\s]+', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m UE IP Address Not Found! \u001B[0m')
count += 1
else:
count = 10
if count < 9:
ue_ip_status -= 1
continue
UE_IPAddress = result.group('ueipaddress')
logging.debug('\u001B[1mUE (' + device_id + ') IP Address is ' + UE_IPAddress + '\u001B[0m')
for ueipaddress in self.UEIPAddresses:
if ueipaddress == UE_IPAddress:
logging.debug('\u001B[1mUE (' + device_id + ') IP Address ' + UE_IPAddress + 'has been existed!' + '\u001B[0m')
ue_ip_status -= 1
continue
self.UEIPAddresses.append(UE_IPAddress)
def ping_iperf_wrong_exit(self, lock, UE_IPAddress, device_id, statusQueue, message):
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(message)
lock.release()
def Ping_common(self, lock, UE_IPAddress, device_id, statusQueue):
try:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
ping_time = re.findall("-c (\d+)",str(self.ping_args))
ping_status = self.command('stdbuf -o0 ping ' + self.ping_args + ' ' + UE_IPAddress + ' 2>&1 | stdbuf -o0 tee -a ping_' + SSH.testCase_id + '_' + device_id + '.log', '\$', int(ping_time[0])*1.5)
# TIMEOUT CASE
if ping_status < 0:
message = 'Ping with UE (' + str(UE_IPAddress) + ') crashed due to TIMEOUT!'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
return
result = re.search(', (?P<packetloss>[0-9\.]+)% packet loss, time [0-9\.]+ms', str(self.ssh.before))
if result is None:
message = 'Packet Loss Not Found!'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
packetloss = result.group('packetloss')
if float(packetloss) == 100:
message = 'Packet Loss is 100%'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
result = re.search('rtt min\/avg\/max\/mdev = (?P<rtt_min>[0-9\.]+)\/(?P<rtt_avg>[0-9\.]+)\/(?P<rtt_max>[0-9\.]+)\/[0-9\.]+ ms', str(self.ssh.before))
if result is None:
message = 'Ping RTT_Min RTT_Avg RTT_Max Not Found!'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
rtt_min = result.group('rtt_min')
rtt_avg = result.group('rtt_avg')
rtt_max = result.group('rtt_max')
pal_msg = 'Packet Loss : ' + packetloss + '%'
min_msg = 'RTT(Min) : ' + rtt_min + ' ms'
avg_msg = 'RTT(Avg) : ' + rtt_avg + ' ms'
max_msg = 'RTT(Max) : ' + rtt_max + ' ms'
lock.acquire()
logging.debug('\u001B[1;37;44m ping result (' + UE_IPAddress + ') \u001B[0m')
logging.debug('\u001B[1;34m ' + pal_msg + '\u001B[0m')
logging.debug('\u001B[1;34m ' + min_msg + '\u001B[0m')
logging.debug('\u001B[1;34m ' + avg_msg + '\u001B[0m')
logging.debug('\u001B[1;34m ' + max_msg + '\u001B[0m')
qMsg = pal_msg + '\n' + min_msg + '\n' + avg_msg + '\n' + max_msg
packetLossOK = True
if packetloss is not None:
if float(packetloss) > float(self.ping_packetloss_threshold):
logging.debug('\u001B[1;37;41m Packet Loss too high \u001B[0m')
elif float(packetloss) > 0:
logging.debug('\u001B[1;30;43m Packet Loss is not 0% \u001B[0m')
if (packetLossOK):
statusQueue.put(0)
else:
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(qMsg)
lock.release()
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def Ping(self):
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '' or self.EPCSourceCodePath == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = False
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow(self.ping_args, 'KO', pStatus)
ueIpStatus = self.GetAllUEIPAddresses()
if (ueIpStatus < 0):
self.CreateHtmlTestRow(self.ping_args, 'KO', UE_IP_ADDRESS_ISSUE)
multi_jobs = []
i = 0
lock = Lock()
for UE_IPAddress in self.UEIPAddresses:
device_id = self.UEDevices[i]
p = Process(target = SSH.Ping_common, args = (lock,UE_IPAddress,device_id,status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
i = i + 1
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow(self.ping_args, 'KO', ALL_PROCESSES_OK)
sys.exit(1)
else:
ping_status = True
html_queue = SimpleQueue()
while (not status_queue.empty()):
count = status_queue.get()
if (count < 0):
ping_status = False
device_id = status_queue.get()
ip_addr = status_queue.get()
message = status_queue.get()
html_cell = "<pre>UE (" + device_id + ")\nIP Address : " + ip_addr + "\n" + message + "</pre>"
html_queue.put(html_cell)
if (ping_status):
self.CreateHtmlTestRowQueue(self.ping_args, 'OK', len(self.UEDevices), html_queue)
else:
self.CreateHtmlTestRowQueue(self.ping_args, 'KO', len(self.UEDevices), html_queue)
def Iperf_ComputeTime(self):
result = re.search('-t (?P<iperf_time>\d+)', str(self.iperf_args))
if result is None:
logging.debug('\u001B[1;37;41m Iperf time Not Found! \u001B[0m')
sys.exit(1)
return result.group('iperf_time')
def Iperf_ComputeModifiedBW(self, idx, ue_num):
result = re.search('-b (?P<iperf_bandwidth>[0-9\.]+)[KMG]', str(self.iperf_args))
if result is None:
logging.debug('\u001B[1;37;41m Iperf bandwidth Not Found! \u001B[0m')
sys.exit(1)
iperf_bandwidth = result.group('iperf_bandwidth')
if SSH.iperf_profile == 'balanced':
iperf_bandwidth_new = float(iperf_bandwidth)/ue_num
if SSH.iperf_profile == 'single-ue':
iperf_bandwidth_new = float(iperf_bandwidth)
if SSH.iperf_profile == 'unbalanced':
# residual is 2% of max bw
residualBW = float(iperf_bandwidth) / 50
if idx == 0:
iperf_bandwidth_new = float(iperf_bandwidth) - ((ue_num - 1) * residualBW)
else:
iperf_bandwidth_new = residualBW
iperf_bandwidth_str = '-b ' + iperf_bandwidth
iperf_bandwidth_str_new = '-b ' + ('%.2f' % iperf_bandwidth_new)
result = re.sub(iperf_bandwidth_str, iperf_bandwidth_str_new, str(self.iperf_args))
if result is None:
logging.debug('\u001B[1;37;41m Calculate Iperf bandwidth Failed! \u001B[0m')
sys.exit(1)
return result
def Iperf_analyzeV2TCPOutput(self, lock, UE_IPAddress, device_id, statusQueue, iperf_real_options):
self.command('awk -f /tmp/tcp_iperf_stats.awk /tmp/CI-eNB/scripts/iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', 5)
result = re.search('Avg Bitrate : (?P<average>[0-9\.]+ Mbits\/sec) Max Bitrate : (?P<maximum>[0-9\.]+ Mbits\/sec) Min Bitrate : (?P<minimum>[0-9\.]+ Mbits\/sec)', str(self.ssh.before))
if result is not None:
avgbitrate = result.group('average')
maxbitrate = result.group('maximum')
minbitrate = result.group('minimum')
lock.acquire()
logging.debug('\u001B[1;37;44m TCP iperf result (' + UE_IPAddress + ') \u001B[0m')
msg = 'TCP Stats :\n'
if avgbitrate is not None:
logging.debug('\u001B[1;34m Avg Bitrate : ' + avgbitrate + '\u001B[0m')
msg += 'Avg Bitrate : ' + avgbitrate + '\n'
if maxbitrate is not None:
logging.debug('\u001B[1;34m Max Bitrate : ' + maxbitrate + '\u001B[0m')
msg += 'Max Bitrate : ' + maxbitrate + '\n'
if minbitrate is not None:
logging.debug('\u001B[1;34m Min Bitrate : ' + minbitrate + '\u001B[0m')
msg += 'Min Bitrate : ' + minbitrate + '\n'
statusQueue.put(0)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(msg)
lock.release()