Newer
Older
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
bitperf += '%'
jitter = '%.2f ms' % (ji_sum)
if (ps_sum > 0):
pl = float(100 * pl_sum / ps_sum)
packetloss = '%2.1f ' % (pl)
packetloss += '%'
else:
packetloss = 'unknown'
lock.acquire()
if (br_loss < 90):
statusQueue.put(1)
else:
statusQueue.put(0)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
req_msg = 'Req Bitrate : ' + req_bandwidth
bir_msg = 'Bitrate : ' + bitrate
brl_msg = 'Bitrate Perf: ' + bitperf
jit_msg = 'Jitter : ' + jitter
pal_msg = 'Packet Loss : ' + packetloss
statusQueue.put(req_msg + '\n' + bir_msg + '\n' + brl_msg + '\n' + jit_msg + '\n' + pal_msg + '\n')
logging.debug('\u001B[1;37;45m iperf result (' + UE_IPAddress + ') \u001B[0m')
logging.debug('\u001B[1;35m ' + req_msg + '\u001B[0m')
logging.debug('\u001B[1;35m ' + bir_msg + '\u001B[0m')
logging.debug('\u001B[1;35m ' + brl_msg + '\u001B[0m')
logging.debug('\u001B[1;35m ' + jit_msg + '\u001B[0m')
logging.debug('\u001B[1;35m ' + pal_msg + '\u001B[0m')
lock.release()
else:
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Could not analyze from server log')
def Iperf_analyzeV3Output(self, lock, UE_IPAddress, device_id, statusQueue):
result = re.search('(?P<bitrate>[0-9\.]+ [KMG]bits\/sec) +(?:|[0-9\.]+ ms +\d+\/\d+ \((?P<packetloss>[0-9\.]+)%\)) +(?:|receiver)\\\\r\\\\n(?:|\[ *\d+\] Sent \d+ datagrams)\\\\r\\\\niperf Done\.', str(self.ssh.before))
if result is None:
result = re.search('(?P<error>iperf: error - [a-zA-Z0-9 :]+)', str(self.ssh.before))
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
if result is not None:
logging.debug('\u001B[1;37;41m ' + result.group('error') + ' \u001B[0m')
else:
logging.debug('\u001B[1;37;41m Bitrate and/or Packet Loss Not Found! \u001B[0m')
statusQueue.put('Bitrate and/or Packet Loss Not Found!')
lock.release()
bitrate = result.group('bitrate')
packetloss = result.group('packetloss')
lock.acquire()
logging.debug('\u001B[1;37;44m iperf result (' + UE_IPAddress + ') \u001B[0m')
logging.debug('\u001B[1;34m Bitrate : ' + bitrate + '\u001B[0m')
if packetloss is not None:
logging.debug('\u001B[1;34m Packet Loss : ' + packetloss + '%\u001B[0m')
if float(packetloss) > float(self.iperf_packetloss_threshold):
logging.debug('\u001B[1;37;41m Packet Loss too high \u001B[0m')
msg += 'Packet Loss too high!\n'
iperfStatus = False
if (iperfStatus):
statusQueue.put(0)
else:
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
def Iperf_UL_common(self, lock, UE_IPAddress, device_id, idx, ue_num, statusQueue):
udpIperf = True
result = re.search('-u', str(self.iperf_args))
if result is None:
udpIperf = False
ipnumbers = UE_IPAddress.split('.')
if (len(ipnumbers) == 4):
ipnumbers[3] = '1'
EPC_Iperf_UE_IPAddress = ipnumbers[0] + '.' + ipnumbers[1] + '.' + ipnumbers[2] + '.' + ipnumbers[3]
# Launch iperf server on EPC side
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
self.command('rm -f iperf_server_' + SSH.testCase_id + '_' + device_id + '.log', '\$', 5)
port = 5001 + idx
if udpIperf:
self.command('echo $USER; nohup iperf -u -s -i 1 -p ' + str(port) + ' > iperf_server_' + SSH.testCase_id + '_' + device_id + '.log &', self.EPCUserName, 5)
else:
self.command('echo $USER; nohup iperf -s -i 1 -p ' + str(port) + ' > iperf_server_' + SSH.testCase_id + '_' + device_id + '.log &', self.EPCUserName, 5)
time.sleep(0.5)
self.close()
# Launch iperf client on UE
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
iperf_time = self.Iperf_ComputeTime()
time.sleep(0.5)
if udpIperf:
modified_options = self.Iperf_ComputeModifiedBW(idx, ue_num)
else:
modified_options = str(self.iperf_args)
time.sleep(0.5)
self.command('rm -f iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', 5)
iperf_status = self.command('stdbuf -o0 adb -s ' + device_id + ' shell "/data/local/tmp/iperf -c ' + EPC_Iperf_UE_IPAddress + ' ' + modified_options + ' -p ' + str(port) + '" 2>&1 | stdbuf -o0 tee -a iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', int(iperf_time)*5.0)
# TIMEOUT Case
if iperf_status < 0:
self.close()
message = 'iperf on UE (' + str(UE_IPAddress) + ') crashed due to TIMEOUT !'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
return
clientStatus = self.Iperf_analyzeV2Output(lock, UE_IPAddress, device_id, statusQueue, modified_options)
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('killall --signal SIGKILL iperf', self.EPCUserName, 5)
self.close()
if (clientStatus == -1):
time.sleep(1)
if (os.path.isfile('iperf_server_' + SSH.testCase_id + '_' + device_id + '.log')):
os.remove('iperf_server_' + SSH.testCase_id + '_' + device_id + '.log')
self.copyin(self.EPCIPAddress, self.EPCUserName, self.EPCPassword, self.EPCSourceCodePath + '/scripts/iperf_server_' + SSH.testCase_id + '_' + device_id + '.log', '.')
self.Iperf_analyzeV2Server(lock, UE_IPAddress, device_id, statusQueue, modified_options)
def Iperf_common(self, lock, UE_IPAddress, device_id, idx, ue_num, statusQueue):
# Single-UE profile -- iperf only on one UE
if SSH.iperf_profile == 'single-ue' and idx != 0:
return
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
# if by chance ADB server and EPC are on the same remote host, at least log collection will take care of it
self.command('if [ ! -d ' + self.EPCSourceCodePath + '/scripts ]; then mkdir -p ' + self.EPCSourceCodePath + '/scripts ; fi', '\$', 5)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
# Checking if iperf / iperf3 are installed
self.command('adb -s ' + device_id + ' shell "ls /data/local/tmp"', '\$', 5)
result = re.search('iperf3', str(self.ssh.before))
if result is None:
result = re.search('iperf', str(self.ssh.before))
if result is None:
message = 'Neither iperf nor iperf3 installed on UE!'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
else:
useIperf3 = True
# in case of iperf, UL has its own function
if (not useIperf3):
result = re.search('-R', str(self.iperf_args))
if result is not None:
self.close()
self.Iperf_UL_common(lock, UE_IPAddress, device_id, idx, ue_num, statusQueue)
if (useIperf3):
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/iperf3 -s &', '\$', 5)
else:
self.command('rm -f iperf_server_' + SSH.testCase_id + '_' + device_id + '.log', '\$', 5)
result = re.search('-u', str(self.iperf_args))
if result is None:
self.command('echo $USER; nohup adb -s ' + device_id + ' shell "/data/local/tmp/iperf -s -i 1" > iperf_server_' + SSH.testCase_id + '_' + device_id + '.log &', self.ADBUserName, 5)
udpIperf = False
else:
self.command('echo $USER; nohup adb -s ' + device_id + ' shell "/data/local/tmp/iperf -u -s -i 1" > iperf_server_' + SSH.testCase_id + '_' + device_id + '.log &', self.ADBUserName, 5)
time.sleep(0.5)
self.close()
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
iperf_time = self.Iperf_ComputeTime()
if udpIperf:
modified_options = self.Iperf_ComputeModifiedBW(idx, ue_num)
else:
modified_options = str(self.iperf_args)
self.command('rm -f iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', 5)
if (useIperf3):
self.command('stdbuf -o0 iperf3 -c ' + UE_IPAddress + ' ' + modified_options + ' 2>&1 | stdbuf -o0 tee -a iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', int(iperf_time)*5.0)
self.Iperf_analyzeV3Output(lock, UE_IPAddress, device_id, statusQueue)
iperf_status = self.command('stdbuf -o0 iperf -c ' + UE_IPAddress + ' ' + modified_options + ' 2>&1 | stdbuf -o0 tee -a iperf_' + SSH.testCase_id + '_' + device_id + '.log', '\$', int(iperf_time)*5.0)
if iperf_status < 0:
self.close()
message = 'iperf on UE (' + str(UE_IPAddress) + ') crashed due to TIMEOUT !'
logging.debug('\u001B[1;37;41m ' + message + ' \u001B[0m')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, message)
return
clientStatus = self.Iperf_analyzeV2Output(lock, UE_IPAddress, device_id, statusQueue, modified_options)
self.close()
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell ps | grep --color=never iperf | grep -v grep', '\$', 5)
result = re.search('shell +(?P<pid>\d+)', str(self.ssh.before))
if result is not None:
pid_iperf = result.group('pid')
self.command('stdbuf -o0 adb -s ' + device_id + ' shell kill -KILL ' + pid_iperf, '\$', 5)
self.close()
if (clientStatus == -1):
time.sleep(1)
if (os.path.isfile('iperf_server_' + SSH.testCase_id + '_' + device_id + '.log')):
os.remove('iperf_server_' + SSH.testCase_id + '_' + device_id + '.log')
self.copyin(self.ADBIPAddress, self.ADBUserName, self.ADBPassword, self.EPCSourceCodePath + '/scripts/iperf_server_' + SSH.testCase_id + '_' + device_id + '.log', '.')
self.Iperf_analyzeV2Server(lock, UE_IPAddress, device_id, statusQueue, modified_options)
except:
os.kill(os.getppid(),signal.SIGUSR1)
def Iperf(self):
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '' or self.EPCSourceCodePath == '' or self.ADBIPAddress == '' or self.ADBUserName == '' or self.ADBPassword == '':
Usage()
sys.exit('Insufficient Parameter')
initialize_eNB_flag = False
pStatus = self.CheckProcessExist(initialize_eNB_flag)
if (pStatus < 0):
self.CreateHtmlTestRow(self.iperf_args, 'KO', pStatus)
ueIpStatus = self.GetAllUEIPAddresses()
if (ueIpStatus < 0):
self.CreateHtmlTestRow(self.iperf_args, 'KO', UE_IP_ADDRESS_ISSUE)
multi_jobs = []
i = 0
ue_num = len(self.UEIPAddresses)
lock = Lock()
for UE_IPAddress in self.UEIPAddresses:
device_id = self.UEDevices[i]
p = Process(target = SSH.Iperf_common, args = (lock,UE_IPAddress,device_id,i,ue_num,status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
i = i + 1
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow(self.iperf_args, 'KO', ALL_PROCESSES_OK)
html_queue = SimpleQueue()
while (not status_queue.empty()):
count = status_queue.get()
if (count < 0):
iperf_status = False
device_id = status_queue.get()
ip_addr = status_queue.get()
message = status_queue.get()
html_cell = "<pre>UE (" + device_id + ")\nIP Address : " + ip_addr + "\n" + message + "</pre>"
html_queue.put(html_cell)
if (iperf_noperf and iperf_status):
self.CreateHtmlTestRowQueue(self.iperf_args, 'PERF NOT MET', len(self.UEDevices), html_queue)
elif (iperf_status):
self.CreateHtmlTestRowQueue(self.iperf_args, 'OK', len(self.UEDevices), html_queue)
else:
self.CreateHtmlTestRowQueue(self.iperf_args, 'KO', len(self.UEDevices), html_queue)
def CheckProcessExist(self, initialize_eNB_flag):
multi_jobs = []
status_queue = SimpleQueue()
p = Process(target = SSH.CheckHSSProcess, args = (status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
p = Process(target = SSH.CheckMMEProcess, args = (status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
p = Process(target = SSH.CheckSPGWProcess, args = (status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
if initialize_eNB_flag == False:
p = Process(target = SSH.CheckeNBProcess, args = (status_queue,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
if (status_queue.empty()):
return -15
else:
result = 0
while (not status_queue.empty()):
status = status_queue.get()
if (status < 0):
result = status
if result == ENB_PROCESS_FAILED:
fileCheck = re.search('enb_', str(self.eNBLogFile))
if fileCheck is not None:
self.copyin(self.eNBIPAddress, self.eNBUserName, self.eNBPassword, self.eNBSourceCodePath + '/cmake_targets/' + self.eNBLogFile, '.')
logStatus = self.AnalyzeLogFile_eNB()
if logStatus < 0:
result = logStatus
return result
def CheckeNBProcess(self, status_queue):
try:
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep --color=never lte-softmodem', '\$', 5)
result = re.search('lte-softmodem', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m eNB Process Not Found! \u001B[0m')
status_queue.put(ENB_PROCESS_FAILED)
status_queue.put(ENB_PROCESS_OK)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
try:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep --color=never hss', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE):
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
else:
result = re.search('hss_sim s6as diam_hss', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m HSS Process Not Found! \u001B[0m')
status_queue.put(HSS_PROCESS_FAILED)
status_queue.put(HSS_PROCESS_OK)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
try:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep --color=never mme', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE):
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
else:
result = re.search('mme', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m MME Process Not Found! \u001B[0m')
status_queue.put(MME_PROCESS_FAILED)
status_queue.put(MME_PROCESS_OK)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
try:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('stdbuf -o0 ps -aux | grep -v grep | grep --color=never spgw', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
else:
self.command('stdbuf -o0 ps -aux | grep -v grep | grep --color=never xGw', '\$', 5)
result = re.search('xGw', str(self.ssh.before))
if result is None:
logging.debug('\u001B[1;37;41m SPGW Process Not Found! \u001B[0m')
status_queue.put(SPGW_PROCESS_FAILED)
status_queue.put(SPGW_PROCESS_OK)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def AnalyzeLogFile_eNB(self):
if (not os.path.isfile('./' + SSH.eNBLogFile)):
return -1
enb_log_file = open('./' + SSH.eNBLogFile, 'r')
foundAssertion = False
msgAssertion = ''
msgLine = 0
foundSegFault = False
foundRealTimeIssue = False
rrcSetupRequest = 0
rrcSetupComplete = 0
rrcReleaseRequest = 0
rrcReconfigRequest = 0
rrcReconfigComplete = 0
rrcReestablishRequest = 0
rrcReestablishComplete = 0
rrcReestablishReject = 0
uciStatMsgCount = 0
for line in enb_log_file.readlines():
result = re.search('[Ss]egmentation [Ff]ault', str(line))
if result is not None:
foundSegFault = True
result = re.search('[Cc]ore [dD]ump', str(line))
if result is not None:
foundSegFault = True
result = re.search('[Aa]ssertion', str(line))
if result is not None:
foundAssertion = True
result = re.search('LLL', str(line))
if result is not None:
foundRealTimeIssue = True
if foundAssertion and (msgLine < 3):
msgLine += 1
msgAssertion += str(line)
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
result = re.search('Generating RRCConnectionSetup', str(line))
if result is not None:
rrcSetupRequest += 1
result = re.search('RRCConnectionSetupComplete from UE', str(line))
if result is not None:
rrcSetupComplete += 1
result = re.search('Generate RRCConnectionRelease', str(line))
if result is not None:
rrcReleaseRequest += 1
result = re.search('Generate RRCConnectionReconfiguration', str(line))
if result is not None:
rrcReconfigRequest += 1
result = re.search('RRCConnectionReconfigurationComplete from UE rnti', str(line))
if result is not None:
rrcReconfigComplete += 1
result = re.search('RRCConnectionReestablishmentRequest', str(line))
if result is not None:
rrcReestablishRequest += 1
result = re.search('RRCConnectionReestablishmentComplete', str(line))
if result is not None:
rrcReestablishComplete += 1
result = re.search('RRCConnectionReestablishmentReject', str(line))
if result is not None:
rrcReestablishReject += 1
result = re.search('uci->stat', str(line))
if result is not None:
uciStatMsgCount += 1
result = re.search('PDCP.*Out of Resources.*reason', str(line))
if result is not None:
pdcpFailure += 1
result = re.search('ULSCH in error in round', str(line))
if result is not None:
ulschFailure += 1
self.htmleNBFailureMsg = ''
if uciStatMsgCount > 0:
statMsg = 'eNB showed ' + str(uciStatMsgCount) + ' "uci->stat" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if pdcpFailure > 0:
statMsg = 'eNB showed ' + str(pdcpFailure) + ' "PDCP Out of Resources" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if ulschFailure > 0:
statMsg = 'eNB showed ' + str(ulschFailure) + ' "ULSCH in error in round" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
if rrcSetupRequest > 0 or rrcSetupComplete > 0:
rrcMsg = 'eNB requested ' + str(rrcSetupRequest) + ' RRC Connection Setup(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcSetupComplete) + ' were completed'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReleaseRequest > 0:
rrcMsg = 'eNB requested ' + str(rrcReleaseRequest) + ' RRC Connection Release(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReconfigRequest > 0 or rrcReconfigComplete > 0:
rrcMsg = 'eNB requested ' + str(rrcReconfigRequest) + ' RRC Connection Reconfiguration(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReconfigComplete) + ' were completed'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReestablishRequest > 0 or rrcReestablishComplete > 0 or rrcReestablishReject > 0:
rrcMsg = 'eNB requested ' + str(rrcReestablishRequest) + ' RRC Connection Reestablishment(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReestablishComplete) + ' were completed'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReestablishReject) + ' were rejected'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if foundSegFault:
logging.debug('\u001B[1;37;41m eNB ended with a Segmentation Fault! \u001B[0m')
return ENB_PROCESS_SEG_FAULT
if foundAssertion:
logging.debug('\u001B[1;37;41m eNB ended with an assertion! \u001B[0m')
return ENB_PROCESS_ASSERTION
if foundRealTimeIssue:
logging.debug('\u001B[1;37;41m eNB faced real time issues! \u001B[0m')
return ENB_PROCESS_REALTIME_ISSUE
return 0
def TerminateeNB(self):
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('cd ' + self.eNBSourceCodePath + '/cmake_targets', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S daemon --name=enb' + str(SSH.eNB_instance) + '_daemon --stop', '\$', 5)
self.command('rm -f my-lte-softmodem-run' + str(SSH.eNB_instance) + '.sh', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S killall --signal SIGINT lte-softmodem || true', '\$', 5)
time.sleep(5)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep lte-softmodem', '\$', 5)
result = re.search('lte-softmodem', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.eNBPassword + ' | sudo -S killall --signal SIGKILL lte-softmodem || true', '\$', 5)
self.close()
result = re.search('enb_', str(self.eNBLogFile))
if result is not None:
self.copyin(self.eNBIPAddress, self.eNBUserName, self.eNBPassword, self.eNBSourceCodePath + '/cmake_targets/' + self.eNBLogFile, '.')
logStatus = self.AnalyzeLogFile_eNB()
if (logStatus < 0):
self.CreateHtmlTestRow('N/A', 'KO', logStatus)
sys.exit(1)
else:
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
else:
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateHSS(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_hss oai_hss || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep hss', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_hss oai_hss || true', '\$', 5)
else:
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('rm -f ./kill_hss.sh', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S daemon --name=simulated_hss --stop', '\$', 5)
time.sleep(2)
self.command('ps -aux | egrep --color=never "hss_sim|simulated_hss" | grep -v grep | awk \'BEGIN{n=0}{pidId[n]=$2;n=n+1}END{print "kill -9 " pidId[0] " " pidId[1]}\' > ./kill_hss.sh', '\$', 5)
self.command('chmod 755 ./kill_hss.sh', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./kill_hss.sh', '\$', 5)
self.command('rm ./kill_hss.sh', '\$', 5)
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateMME(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_mme mme || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep mme', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_mme mme || true', '\$', 5)
else:
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./stop_mme', '\$', 5)
self.close()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateSPGW(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_spgw spgw || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep -v grep | grep spgw', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_spgw spgw || true', '\$', 5)
else:
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./stop_xGw', '\$', 5)
self.close()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateUE_common(self, device_id):
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/off', '\$', 60)
logging.debug('\u001B[1mUE (' + device_id + ') Detach Completed\u001B[0m')
self.command('stdbuf -o0 adb -s ' + device_id + ' shell ps | grep --color=never iperf | grep -v grep', '\$', 5)
result = re.search('shell +(?P<pid>\d+)', str(self.ssh.before))
if result is not None:
pid_iperf = result.group('pid')
self.command('stdbuf -o0 adb -s ' + device_id + ' shell kill -KILL ' + pid_iperf, '\$', 5)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def TerminateUE(self):
terminate_ue_flag = True
SSH.GetAllUEDevices(terminate_ue_flag)
multi_jobs = []
for device_id in self.UEDevices:
p = Process(target= SSH.TerminateUE_common, args = (device_id,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def AutoTerminateUEandeNB(self):
self.testCase_id = 'AUTO-KILL-UE'
self.desc = 'Automatic Termination of UE'
self.ShowTestID()
self.TerminateUE()
self.testCase_id = 'AUTO-KILL-eNB'
self.desc = 'Automatic Termination of eNB'
self.ShowTestID()
self.eNB_instance = '0'
self.TerminateeNB()
def LogCollectBuild(self):
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('zip build.log.zip build_log_*/*', '\$', 60)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm -rf build_log_*', '\$', 5)
def LogCollecteNB(self):
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm -f enb.log.zip', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S zip enb.log.zip enb*.log core*', '\$', 60)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm enb*.log core*', '\$', 5)
def LogCollectPing(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('zip ping.log.zip ping*.log', '\$', 60)
self.command('rm ping*.log', '\$', 5)
self.close()
def LogCollectIperf(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('zip iperf.log.zip iperf*.log', '\$', 60)
self.command('rm iperf*.log', '\$', 5)
self.close()
def LogCollectHSS(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('zip hss.log.zip hss*.log', '\$', 60)
self.command('rm hss*.log', '\$', 5)
else:
self.command('cp /opt/hss_sim0609/hss.log .', '\$', 60)
self.command('zip hss.log.zip hss.log', '\$', 60)
def LogCollectMME(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('zip mme.log.zip mme*.log', '\$', 60)
self.command('rm mme*.log', '\$', 5)
else:
self.command('cp /opt/ltebox/var/log/*Log.0 .', '\$', 5)
self.command('zip mme.log.zip mmeLog.0 s1apcLog.0 s1apsLog.0 s11cLog.0 libLog.0 s1apCodecLog.0', '\$', 60)
def LogCollectSPGW(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('zip spgw.log.zip spgw*.log', '\$', 60)
self.command('rm spgw*.log', '\$', 5)
else:
self.command('cp /opt/ltebox/var/log/xGwLog.0 .', '\$', 5)
self.command('zip spgw.log.zip xGwLog.0', '\$', 60)
self.close()
#-----------------------------------------------------------
# HTML Reporting....
#-----------------------------------------------------------
def CreateHtmlHeader(self):
if (not self.htmlHeaderCreated):
self.htmlFile = open('test_results.html', 'w')
self.htmlFile.write('<!DOCTYPE html>\n')
self.htmlFile.write('<html class="no-js" lang="en-US">\n')
self.htmlFile.write('<head>\n')
self.htmlFile.write(' <title>Test Results for TEMPLATE_JOB_NAME job build #TEMPLATE_BUILD_ID</title>\n')
self.htmlFile.write('</head>\n')
self.htmlFile.write('<body>\n')
self.htmlFile.write(' <table style="border-collapse: collapse; border: none;">\n')
self.htmlFile.write(' <tr style="border-collapse: collapse; border: none;">\n')
self.htmlFile.write(' <td style="border-collapse: collapse; border: none;">\n')
self.htmlFile.write(' <a href="http://www.openairinterface.org/">\n')
self.htmlFile.write(' <img src="http://www.openairinterface.org/wp-content/uploads/2016/03/cropped-oai_final_logo2.png" alt="" border="none" height=50 width=150>\n')
self.htmlFile.write(' </img>\n')
self.htmlFile.write(' </a>\n')
self.htmlFile.write(' </td>\n')
self.htmlFile.write(' <td style="border-collapse: collapse; border: none; vertical-align: center;">\n')
self.htmlFile.write(' <b><font size = "6">Job Summary -- Job: TEMPLATE_JOB_NAME -- Build-ID: TEMPLATE_BUILD_ID</font></b>\n')
self.htmlFile.write(' </td>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' </table>\n')
self.htmlFile.write(' <br>\n')
self.htmlFile.write(' <table border = "1">\n')
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >Build Start Time (UTC)</td>\n')
self.htmlFile.write(' <td>TEMPLATE_BUILD_TIME</td>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >GIT Repository</td>\n')
self.htmlFile.write(' <td><a href="' + SSH.eNBRepository + '">' + SSH.eNBRepository + '</a></td>\n')
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >Job Trigger</td>\n')
if (SSH.eNB_AllowMerge):
self.htmlFile.write(' <td>Merge-Request</td>\n')
else:
self.htmlFile.write(' <td>Push to Branch</td>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' <tr>\n')
if (SSH.eNB_AllowMerge):
self.htmlFile.write(' <td bgcolor = "lightcyan" >Source Branch</td>\n')
else:
self.htmlFile.write(' <td bgcolor = "lightcyan" >Branch</td>\n')
self.htmlFile.write(' <td>' + SSH.eNBBranch + '</td>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' <tr>\n')
if (SSH.eNB_AllowMerge):
self.htmlFile.write(' <td bgcolor = "lightcyan" >Source Commit ID</td>\n')
else:
self.htmlFile.write(' <td bgcolor = "lightcyan" >Commit ID</td>\n')
self.htmlFile.write(' <td>' + SSH.eNBCommitID + '</td>\n')
self.htmlFile.write(' </tr>\n')
if (SSH.eNB_AllowMerge):
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >Target Branch</td>\n')
self.htmlFile.write(' <td>develop</td>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' </table>\n')
terminate_ue_flag = True
SSH.GetAllUEDevices(terminate_ue_flag)
self.htmlUEConnected = len(self.UEDevices)
self.htmlFile.write(' <h2><a href="#FinalStatus">Jump to Final Status</a></h2>\n')
self.htmlFile.write(' <h2>' + str(self.htmlUEConnected) + ' UE(s) is(are) connected to ADB bench server</h2>\n')
self.htmlFile.write(' <br>\n')
self.htmlFile.write(' <h2>Test Summary for ' + SSH.testXMLfile + '</h2>\n')
self.htmlFile.write(' <table border = "1">\n')
self.htmlFile.write(' <tr bgcolor = "#33CCFF" >\n')
self.htmlFile.write(' <th>Test Id</th>\n')
self.htmlFile.write(' <th>Test Desc</th>\n')
self.htmlFile.write(' <th>Test Options</th>\n')
self.htmlFile.write(' <th>Test Status</th>\n')
i = 0
while (i < self.htmlUEConnected):
self.htmlFile.write(' <th>UE' + str(i) + ' Status</th>\n')
i += 1
self.htmlFile.write(' </tr>\n')
self.htmlHeaderCreated = True
def CreateHtmlFooter(self, passStatus):
if ((not self.htmlFooterCreated) and (self.htmlHeaderCreated)):
self.htmlFile.write(' <tr id="FinalStatus">\n')
self.htmlFile.write(' <th bgcolor = "#33CCFF" colspan=2>Final Status</th>\n')
if passStatus:
self.htmlFile.write(' <th bgcolor = "green" colspan=' + str(2 + self.htmlUEConnected) + '><font color="white">PASS</font></th>\n')
else:
self.htmlFile.write(' <th bgcolor = "red" colspan=' + str(2 + self.htmlUEConnected) + '><font color="white">FAIL</font></th>\n')
self.htmlFile.write(' </tr>\n')
self.htmlFile.write(' </table>\n')
self.htmlFile.write('</body>\n')
self.htmlFile.write('</html>\n')
self.htmlFile.close()
self.htmlFooterCreated = False
def CreateHtmlTestRow(self, options, status, processesStatus):
if ((not self.htmlFooterCreated) and (self.htmlHeaderCreated)):
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >' + SSH.testCase_id + '</td>\n')
self.htmlFile.write(' <td>' + SSH.desc + '</td>\n')
self.htmlFile.write(' <td>' + str(options) + '</td>\n')
if (str(status) == 'OK'):
self.htmlFile.write(' <td bgcolor = "lightgreen" >' + str(status) + '</td>\n')
elif (str(status) == 'KO'):
if (processesStatus == 0):
self.htmlFile.write(' <td bgcolor = "lightcoral" >' + str(status) + '</td>\n')
elif (processesStatus == ENB_PROCESS_FAILED):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - eNB process not found</td>\n')
elif (processesStatus == ENB_PROCESS_SEG_FAULT):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - eNB process ended in Segmentation Fault</td>\n')
elif (processesStatus == ENB_PROCESS_ASSERTION):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - eNB process ended in Assertion</td>\n')
elif (processesStatus == ENB_PROCESS_REALTIME_ISSUE):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - eNB process faced Real Time issue(s)/td>\n')
elif (processesStatus == HSS_PROCESS_FAILED):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - HSS process not found</td>\n')
elif (processesStatus == MME_PROCESS_FAILED):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - MME process not found</td>\n')
elif (processesStatus == SPGW_PROCESS_FAILED):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - SPGW process not found</td>\n')
elif (processesStatus == UE_IP_ADDRESS_ISSUE):
self.htmlFile.write(' <td bgcolor = "lightcoral" >KO - Could not retrieve UE IP address</td>\n')
else:
self.htmlFile.write(' <td bgcolor = "lightcoral" >' + str(status) + '</td>\n')
else:
self.htmlFile.write(' <td bgcolor = "orange" >' + str(status) + '</td>\n')
cellBgColor = 'white'
result = re.search('ended with|faced real time issues', self.htmleNBFailureMsg)
if result is not None:
cellBgColor = 'red'
else:
result = re.search('showed|Reestablishment', self.htmleNBFailureMsg)
if result is not None:
cellBgColor = 'orange'
self.htmlFile.write(' <td bgcolor = "' + cellBgColor + '" colspan=' + str(self.htmlUEConnected) + '><pre>' + self.htmleNBFailureMsg + '</pre></td>\n')
else:
i = 0
while (i < self.htmlUEConnected):
self.htmlFile.write(' <td>-</td>\n')
i += 1
def CreateHtmlTestRowQueue(self, options, status, ue_status, ue_queue):
if ((not self.htmlFooterCreated) and (self.htmlHeaderCreated)):
self.htmlFile.write(' <tr>\n')
self.htmlFile.write(' <td bgcolor = "lightcyan" >' + SSH.testCase_id + '</td>\n')
self.htmlFile.write(' <td>' + SSH.desc + '</td>\n')
self.htmlFile.write(' <td>' + str(options) + '</td>\n')
if (str(status) == 'OK'):
self.htmlFile.write(' <td bgcolor = "lightgreen" >' + str(status) + '</td>\n')
elif (str(status) == 'KO'):
self.htmlFile.write(' <td bgcolor = "lightcoral" >' + str(status) + '</td>\n')
else:
self.htmlFile.write(' <td bgcolor = "orange" >' + str(status) + '</td>\n')
i = 0
while (i < self.htmlUEConnected):
if (i < ue_status):
if (not ue_queue.empty()):
if (addOrangeBK):
self.htmlFile.write(' <td bgcolor = "orange" >' + str(ue_queue.get()) + '</td>\n')
else:
self.htmlFile.write(' <td>' + str(ue_queue.get()) + '</td>\n')
else:
self.htmlFile.write(' <td>-</td>\n')
else:
self.htmlFile.write(' <td>-</td>\n')
i += 1
self.htmlFile.write(' </tr>\n')
#-----------------------------------------------------------
# ShowTestID()
#-----------------------------------------------------------
def ShowTestID(self):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
logging.debug('\u001B[1mTest ID:' + self.testCase_id + '\u001B[0m')
logging.debug('\u001B[1m' + self.desc + '\u001B[0m')
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
#-----------------------------------------------------------
# Usage()
#-----------------------------------------------------------
def Usage():
print('------------------------------------------------------------')
print('main.py Ver:' + Version)
print('------------------------------------------------------------')
print('Usage: python main.py [options]')
print(' --help Show this help.')
print(' --mode=[Mode]')
print(' TesteNB')
print(' TerminateeNB, TerminateUE, TerminateHSS, TerminateMME, TerminateSPGW')
print(' LogCollectBuild, LogCollecteNB, LogCollectHSS, LogCollectMME, LogCollectSPGW, LogCollectPing, LogCollectIperf')
print(' --eNBIPAddress=[eNB\'s IP Address]')
print(' --eNBRepository=[eNB\'s Repository URL]')
print(' --eNBBranch=[eNB\'s Branch Name]')
print(' --eNBCommitID=[eNB\'s Commit Number]')
print(' --eNBUserName=[eNB\'s Login User Name]')
print(' --eNBPassword=[eNB\'s Login Password]')
print(' --eNBSourceCodePath=[eNB\'s Source Code Path]')
print(' --EPCIPAddress=[EPC\'s IP Address]')
print(' --EPCUserName=[EPC\'s Login User Name]')
print(' --EPCPassword=[EPC\'s Login Password]')
print(' --EPCSourceCodePath=[EPC\'s Source Code Path]')
print(' --EPCType=[EPC\'s Type: OAI or ltebox]')
print(' --ADBIPAddress=[ADB\'s IP Address]')
print(' --ADBUserName=[ADB\'s Login User Name]')
print(' --ADBPassword=[ADB\'s Login Password]')
print(' --XMLTestFile=[XML Test File to be run]')
print('------------------------------------------------------------')
def CheckClassValidity(action,id):
if action != 'Build_eNB' and action != 'Initialize_eNB' and action != 'Terminate_eNB' and action != 'Initialize_UE' and action != 'Terminate_UE' and action != 'Attach_UE' and action != 'Detach_UE' and action != 'Ping' and action != 'Iperf' and action != 'Reboot_UE' and action != 'Initialize_HSS' and action != 'Terminate_HSS' and action != 'Initialize_MME' and action != 'Terminate_MME' and action != 'Initialize_SPGW' and action != 'Terminate_SPGW':
logging.debug('ERROR: test-case ' + id + ' has wrong class ' + action)
return False
return True
def GetParametersFromXML(action):
if action == 'Build_eNB':
SSH.Build_eNB_args = test.findtext('Build_eNB_args')
if action == 'Initialize_eNB':
SSH.Initialize_eNB_args = test.findtext('Initialize_eNB_args')
SSH.eNB_instance = test.findtext('eNB_instance')
if (SSH.eNB_instance is None):
SSH.eNB_instance = '0'
if action == 'Terminate_eNB':
SSH.eNB_instance = test.findtext('eNB_instance')
if (SSH.eNB_instance is None):
SSH.eNB_instance = '0'
if action == 'Ping':
SSH.ping_args = test.findtext('ping_args')
SSH.ping_packetloss_threshold = test.findtext('ping_packetloss_threshold')
if action == 'Iperf':
SSH.iperf_args = test.findtext('iperf_args')
SSH.iperf_packetloss_threshold = test.findtext('iperf_packetloss_threshold')
SSH.iperf_profile = test.findtext('iperf_profile')
if (SSH.iperf_profile is None):
SSH.iperf_profile = 'balanced'
else:
if SSH.iperf_profile != 'balanced' and SSH.iperf_profile != 'unbalanced' and SSH.iperf_profile != 'single-ue':
logging.debug('ERROR: test-case has wrong profile ' + SSH.iperf_profile)
SSH.iperf_profile = 'balanced'
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
#check if given test is in list
#it is in list if one of the strings in 'list' is at the beginning of 'test'
def test_in_list(test, list):
for check in list:
check=check.replace('+','')
if (test.startswith(check)):
return True
return False
def receive_signal(signum, frame):
sys.exit(1)
#-----------------------------------------------------------
# Parameter Check
#-----------------------------------------------------------
mode = ''
SSH = SSHConnection()
argvs = sys.argv
argc = len(argvs)
while len(argvs) > 1:
myArgv = argvs.pop(1) # 0th is this file's name
if re.match('^\-\-help$', myArgv, re.IGNORECASE):
Usage()
sys.exit(0)
elif re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE)
mode = matchReg.group(1)
elif re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE)
SSH.eNBIPAddress = matchReg.group(1)
elif re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE)
SSH.eNBRepository = matchReg.group(1)
elif re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE)
doMerge = matchReg.group(1)
if ((doMerge == 'true') or (doMerge == 'True')):
SSH.eNB_AllowMerge = True
elif re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE)