Newer
Older
if re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('stdbuf -o0 ps -aux | grep --color=never spgw | grep -v grep', '\$', 5)
result = re.search('spgwu -c ', str(self.ssh.before))
elif re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('stdbuf -o0 ps -aux | grep --color=never spgw | grep -v grep', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('stdbuf -o0 ps -aux | grep --color=never xGw | grep -v grep', '\$', 5)
result = re.search('xGw', str(self.ssh.before))
else:
logging.error('This should not happen!')
if result is None:
logging.debug('\u001B[1;37;41m SPGW Process Not Found! \u001B[0m')
status_queue.put(SPGW_PROCESS_FAILED)
status_queue.put(SPGW_PROCESS_OK)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def AnalyzeLogFile_eNB(self, eNBlogFile):
if (not os.path.isfile('./' + eNBlogFile)):
enb_log_file = open('./' + eNBlogFile, 'r')
exitSignalReceived = False
foundAssertion = False
msgAssertion = ''
msgLine = 0
foundSegFault = False
foundRealTimeIssue = False
rrcSetupComplete = 0
rrcReleaseRequest = 0
rrcReconfigRequest = 0
rrcReconfigComplete = 0
rrcReestablishRequest = 0
rrcReestablishComplete = 0
rrcReestablishReject = 0
rlcDiscardBuffer = 0
rachCanceledProcedure = 0
cdrxActivationMessageCount = 0
self.htmleNBFailureMsg = ''

Raphael Defosseux
committed
isRRU = False
isSlave = False
slaveReceivesFrameResyncCmd = False
X2HO_state = X2_HO_REQ_STATE__IDLE
X2HO_inNbProcedures = 0
X2HO_outNbProcedures = 0
for line in enb_log_file.readlines():
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
if X2HO_state == X2_HO_REQ_STATE__IDLE:
result = re.search('target eNB Receives X2 HO Req X2AP_HANDOVER_REQ', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__TARGET_RECEIVES_REQ
result = re.search('source eNB receives the X2 HO ACK X2AP_HANDOVER_REQ_ACK', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__SOURCE_RECEIVES_REQ_ACK
if X2HO_state == X2_HO_REQ_STATE__TARGET_RECEIVES_REQ:
result = re.search('Received LTE_RRCConnectionReconfigurationComplete from UE', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__TARGET_RRC_RECFG_COMPLETE
if X2HO_state == X2_HO_REQ_STATE__TARGET_RRC_RECFG_COMPLETE:
result = re.search('issue rrc_eNB_send_PATH_SWITCH_REQ', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__TARGET_SENDS_SWITCH_REQ
if X2HO_state == X2_HO_REQ_STATE__TARGET_SENDS_SWITCH_REQ:
result = re.search('received path switch ack S1AP_PATH_SWITCH_REQ_ACK', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__IDLE
X2HO_inNbProcedures += 1
if X2HO_state == X2_HO_REQ_STATE__SOURCE_RECEIVES_REQ_ACK:
result = re.search('source eNB receives the X2 UE CONTEXT RELEASE X2AP_UE_CONTEXT_RELEASE', str(line))
if result is not None:
X2HO_state = X2_HO_REQ_STATE__IDLE
X2HO_outNbProcedures += 1

Raphael Defosseux
committed
if self.eNBOptions[int(self.eNB_instance)] != '':
res1 = re.search('max_rxgain (?P<requested_option>[0-9]+)', self.eNBOptions[int(self.eNB_instance)])
res2 = re.search('max_rxgain (?P<applied_option>[0-9]+)', str(line))
if res1 is not None and res2 is not None:
requested_option = int(res1.group('requested_option'))
applied_option = int(res2.group('applied_option'))
if requested_option == applied_option:

Raphael Defosseux
committed
self.htmleNBFailureMsg += '<span class="glyphicon glyphicon-ok-circle"></span> Command line option(s) correctly applied <span class="glyphicon glyphicon-arrow-right"></span> ' + self.eNBOptions[int(self.eNB_instance)] + '\n\n'

Raphael Defosseux
committed
self.htmleNBFailureMsg += '<span class="glyphicon glyphicon-ban-circle"></span> Command line option(s) NOT applied <span class="glyphicon glyphicon-arrow-right"></span> ' + self.eNBOptions[int(self.eNB_instance)] + '\n\n'
result = re.search('Exiting OAI softmodem', str(line))
exitSignalReceived = True
result = re.search('[Ss]egmentation [Ff]ault', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('[Cc]ore [dD]ump', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('./lte_build_oai/build/lte-softmodem', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('[Aa]ssertion', str(line))
if result is not None and not exitSignalReceived:
foundAssertion = True
result = re.search('LLL', str(line))
if result is not None and not exitSignalReceived:
foundRealTimeIssue = True
if foundAssertion and (msgLine < 3):
msgLine += 1
msgAssertion += str(line)

Raphael Defosseux
committed
result = re.search('Setting function for RU', str(line))
if result is not None:
isRRU = True
if isRRU:
result = re.search('RU 0 is_slave=yes', str(line))
if result is not None:
isSlave = True
if isSlave:
result = re.search('Received RRU_frame_resynch command', str(line))
if result is not None:
slaveReceivesFrameResyncCmd = True
result = re.search('LTE_RRCConnectionSetupComplete from UE', str(line))

Raphael Defosseux
committed
result = re.search('Generate LTE_RRCConnectionRelease|Generate RRCConnectionRelease', str(line))
if result is not None:
rrcReleaseRequest += 1
result = re.search('Generate LTE_RRCConnectionReconfiguration', str(line))
if result is not None:
rrcReconfigRequest += 1
result = re.search('LTE_RRCConnectionReconfigurationComplete from UE rnti', str(line))
if result is not None:
rrcReconfigComplete += 1
result = re.search('LTE_RRCConnectionReestablishmentRequest', str(line))
if result is not None:
rrcReestablishRequest += 1
result = re.search('LTE_RRCConnectionReestablishmentComplete', str(line))
if result is not None:
rrcReestablishComplete += 1
result = re.search('LTE_RRCConnectionReestablishmentReject', str(line))
if result is not None:
rrcReestablishReject += 1
result = re.search('CDRX configuration activated after RRC Connection', str(line))
if result is not None:
cdrxActivationMessageCount += 1
result = re.search('uci->stat', str(line))
if result is not None:
uciStatMsgCount += 1
result = re.search('PDCP.*Out of Resources.*reason', str(line))
if result is not None:
pdcpFailure += 1
result = re.search('ULSCH in error in round', str(line))
if result is not None:
ulschFailure += 1
result = re.search('BAD all_segments_received', str(line))
if result is not None:
rlcDiscardBuffer += 1
result = re.search('Canceled RA procedure for UE rnti', str(line))
if result is not None:
rachCanceledProcedure += 1
result = re.search('dropping, not enough RBs', str(line))
if result is not None:
dropNotEnoughRBs += 1
logging.debug(' File analysis completed')
statMsg = 'eNB showed ' + str(uciStatMsgCount) + ' "uci->stat" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if pdcpFailure > 0:
statMsg = 'eNB showed ' + str(pdcpFailure) + ' "PDCP Out of Resources" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if ulschFailure > 0:
statMsg = 'eNB showed ' + str(ulschFailure) + ' "ULSCH in error in round" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if dropNotEnoughRBs > 0:
statMsg = 'eNB showed ' + str(dropNotEnoughRBs) + ' "dropping, not enough RBs" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmleNBFailureMsg += statMsg + '\n'
if rrcSetupComplete > 0:
rrcMsg = 'eNB completed ' + str(rrcSetupComplete) + ' RRC Connection Setup(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReleaseRequest > 0:
rrcMsg = 'eNB requested ' + str(rrcReleaseRequest) + ' RRC Connection Release(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReconfigRequest > 0 or rrcReconfigComplete > 0:
rrcMsg = 'eNB requested ' + str(rrcReconfigRequest) + ' RRC Connection Reconfiguration(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReconfigComplete) + ' were completed'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rrcReestablishRequest > 0 or rrcReestablishComplete > 0 or rrcReestablishReject > 0:
rrcMsg = 'eNB requested ' + str(rrcReestablishRequest) + ' RRC Connection Reestablishment(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReestablishComplete) + ' were completed'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
rrcMsg = ' -- ' + str(rrcReestablishReject) + ' were rejected'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if X2HO_inNbProcedures > 0:
rrcMsg = 'eNB completed ' + str(X2HO_inNbProcedures) + ' X2 Handover Connection procedure(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if X2HO_outNbProcedures > 0:
rrcMsg = 'eNB completed ' + str(X2HO_outNbProcedures) + ' X2 Handover Release procedure(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if self.eNBOptions[int(self.eNB_instance)] != '':
res1 = re.search('drx_Config_present prSetup', self.eNBOptions[int(self.eNB_instance)])
if res1 is not None:
if cdrxActivationMessageCount > 0:
rrcMsg = 'eNB activated the CDRX Configuration for ' + str(cdrxActivationMessageCount) + ' time(s)'
logging.debug('\u001B[1;30;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
else:
rrcMsg = 'eNB did NOT ACTIVATE the CDRX Configuration'
logging.debug('\u001B[1;37;43m ' + rrcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rrcMsg + '\n'
if rachCanceledProcedure > 0:
rachMsg = 'eNB cancelled ' + str(rachCanceledProcedure) + ' RA procedure(s)'
logging.debug('\u001B[1;30;43m ' + rachMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rachMsg + '\n'

Raphael Defosseux
committed
if isRRU:
if isSlave:
if slaveReceivesFrameResyncCmd:
rruMsg = 'Slave RRU received the RRU_frame_resynch command from RAU'
logging.debug('\u001B[1;30;43m ' + rruMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rruMsg + '\n'
else:
rruMsg = 'Slave RRU DID NOT receive the RRU_frame_resynch command from RAU'
logging.debug('\u001B[1;37;41m ' + rruMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rruMsg + '\n'
self.prematureExit = True

Raphael Defosseux
committed
return ENB_PROCESS_SLAVE_RRU_NOT_SYNCED
if foundSegFault:
logging.debug('\u001B[1;37;41m eNB ended with a Segmentation Fault! \u001B[0m')
return ENB_PROCESS_SEG_FAULT
if foundAssertion:
logging.debug('\u001B[1;37;41m eNB ended with an assertion! \u001B[0m')
return ENB_PROCESS_ASSERTION
if foundRealTimeIssue:
logging.debug('\u001B[1;37;41m eNB faced real time issues! \u001B[0m')
self.htmleNBFailureMsg += 'eNB faced real time issues!\n'
#return ENB_PROCESS_REALTIME_ISSUE
if rlcDiscardBuffer > 0:
rlcMsg = 'eNB RLC discarded ' + str(rlcDiscardBuffer) + ' buffer(s)'
logging.debug('\u001B[1;37;41m ' + rlcMsg + ' \u001B[0m')
self.htmleNBFailureMsg += rlcMsg + '\n'
return ENB_PROCESS_REALTIME_ISSUE
def AnalyzeLogFile_UE(self, UElogFile):
if (not os.path.isfile('./' + UElogFile)):
return -1
ue_log_file = open('./' + UElogFile, 'r')
exitSignalReceived = False
foundAssertion = False
msgAssertion = ''
msgLine = 0
foundSegFault = False
foundRealTimeIssue = False
uciStatMsgCount = 0
pdcpDataReqFailedCount = 0
badDciCount = 0
rrcConnectionRecfgComplete = 0
no_cell_sync_found = False
mib_found = False
frequency_found = False
plmn_found = False
result = re.search('Exiting OAI softmodem', str(line))
exitSignalReceived = True
result = re.search('System error|[Ss]egmentation [Ff]ault|======= Backtrace: =========|======= Memory map: ========', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('[Cc]ore [dD]ump', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('./lte-uesoftmodem', str(line))
if result is not None and not exitSignalReceived:
foundSegFault = True
result = re.search('[Aa]ssertion', str(line))
if result is not None and not exitSignalReceived:
foundAssertion = True
result = re.search('LLL', str(line))
if result is not None and not exitSignalReceived:
foundRealTimeIssue = True
if foundAssertion and (msgLine < 3):
msgLine += 1
msgAssertion += str(line)
result = re.search('uci->stat', str(line))
if result is not None and not exitSignalReceived:
result = re.search('PDCP data request failed', str(line))
if result is not None and not exitSignalReceived:
pdcpDataReqFailedCount += 1
result = re.search('bad DCI 1A', str(line))
if result is not None and not exitSignalReceived:
badDciCount += 1
result = re.search('Generating RRCConnectionReconfigurationComplete', str(line))
if result is not None:
rrcConnectionRecfgComplete += 1
result = re.search('No cell synchronization found, abandoning', str(line))
if result is not None:
no_cell_sync_found = True
result = re.search("MIB Information => ([a-zA-Z]{1,10}), ([a-zA-Z]{1,10}), NidCell (?P<nidcell>\d{1,3}), N_RB_DL (?P<n_rb_dl>\d{1,3}), PHICH DURATION (?P<phich_duration>\d), PHICH RESOURCE (?P<phich_resource>.{1,4}), TX_ANT (?P<tx_ant>\d)", str(line))
if result is not None and (not mib_found):
mibMsg = "MIB Information: " + result.group(1) + ', ' + result.group(2)
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
mibMsg = " nidcell = " + result.group('nidcell')
self.htmlUEFailureMsg += mibMsg
logging.debug('\033[94m' + mibMsg + '\033[0m')
mibMsg = " n_rb_dl = " + result.group('n_rb_dl')
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
mibMsg = " phich_duration = " + result.group('phich_duration')
self.htmlUEFailureMsg += mibMsg
logging.debug('\033[94m' + mibMsg + '\033[0m')
mibMsg = " phich_resource = " + result.group('phich_resource')
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
mibMsg = " tx_ant = " + result.group('tx_ant')
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
mib_found = True
logging.error('\033[91m' + "MIB marker was not found" + '\033[0m')
result = re.search("Measured Carrier Frequency (?P<measured_carrier_frequency>\d{1,15}) Hz", str(line))
if result is not None and (not frequency_found):
mibMsg = "Measured Carrier Frequency = " + result.group('measured_carrier_frequency') + ' Hz'
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
frequency_found = True
logging.error('\033[91m' + "Measured Carrier Frequency not found" + '\033[0m')
result = re.search("PLMN MCC (?P<mcc>\d{1,3}), MNC (?P<mnc>\d{1,3}), TAC", str(line))
if result is not None and (not plmn_found):
try:
mibMsg = 'PLMN MCC = ' + result.group('mcc') + ' MNC = ' + result.group('mnc')
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
plmn_found = True
except Exception as e:
logging.error('\033[91m' + "PLMN not found" + '\033[0m')
result = re.search("Found (?P<operator>[\w,\s]{1,15}) \(name from internal table\)", str(line))
if result is not None:
try:
mibMsg = "The operator is: " + result.group('operator')
self.htmlUEFailureMsg += mibMsg + '\n'
logging.debug('\033[94m' + mibMsg + '\033[0m')
logging.error('\033[91m' + "Operator name not found" + '\033[0m')
result = re.search("SIB5 InterFreqCarrierFreq element (.{1,4})/(.{1,4})", str(line))
if result is not None:
try:
mibMsg = "SIB5 InterFreqCarrierFreq element " + result.group(1) + '/' + result.group(2)
self.htmlUEFailureMsg += mibMsg + ' -> '
logging.debug('\033[94m' + mibMsg + '\033[0m')
logging.error('\033[91m' + "SIB5 InterFreqCarrierFreq element not found" + '\033[0m')
result = re.search("DL Carrier Frequency/ARFCN : (?P<carrier_frequency>\d{1,15}/\d{1,4})", str(line))
if result is not None:
try:
freq = result.group('carrier_frequency')
new_freq = re.sub('/[0-9]+','',freq)
float_freq = float(new_freq) / 1000000
self.htmlUEFailureMsg += 'DL Freq: ' + ('%.1f' % float_freq) + ' MHz'
logging.debug('\033[94m' + " DL Carrier Frequency is: " + freq + '\033[0m')
logging.error('\033[91m' + " DL Carrier Frequency not found" + '\033[0m')
result = re.search("AllowedMeasBandwidth : (?P<allowed_bandwidth>\d{1,7})", str(line))
if result is not None:
try:
prb = result.group('allowed_bandwidth')
self.htmlUEFailureMsg += ' -- PRB: ' + prb + '\n'
logging.debug('\033[94m' + " AllowedMeasBandwidth: " + prb + '\033[0m')
logging.error('\033[91m' + " AllowedMeasBandwidth not found" + '\033[0m')
if rrcConnectionRecfgComplete > 0:
statMsg = 'UE connected to eNB (' + str(rrcConnectionRecfgComplete) + ' RRCConnectionReconfigurationComplete message(s) generated)'
logging.debug('\033[94m' + statMsg + '\033[0m')
self.htmlUEFailureMsg += statMsg + '\n'
if uciStatMsgCount > 0:
statMsg = 'UE showed ' + str(uciStatMsgCount) + ' "uci->stat" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmlUEFailureMsg += statMsg + '\n'
if pdcpDataReqFailedCount > 0:
statMsg = 'UE showed ' + str(pdcpDataReqFailedCount) + ' "PDCP data request failed" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmlUEFailureMsg += statMsg + '\n'
if badDciCount > 0:
statMsg = 'UE showed ' + str(badDciCount) + ' "bad DCI 1A" message(s)'
logging.debug('\u001B[1;30;43m ' + statMsg + ' \u001B[0m')
self.htmlUEFailureMsg += statMsg + '\n'
if foundSegFault:
logging.debug('\u001B[1;37;41m UE ended with a Segmentation Fault! \u001B[0m')
return ENB_PROCESS_SEG_FAULT
if foundAssertion:
logging.debug('\u001B[1;30;43m UE showed an assertion! \u001B[0m')
self.htmlUEFailureMsg += 'UE showed an assertion!\n'
if not mib_found or not frequency_found:
return OAI_UE_PROCESS_ASSERTION
if foundRealTimeIssue:
logging.debug('\u001B[1;37;41m UE faced real time issues! \u001B[0m')
self.htmlUEFailureMsg += 'UE faced real time issues!\n'
#return ENB_PROCESS_REALTIME_ISSUE
if no_cell_sync_found and not mib_found:
logging.debug('\u001B[1;37;41m UE could not synchronize ! \u001B[0m')
self.htmlUEFailureMsg += 'UE could not synchronize!\n'
return OAI_UE_PROCESS_COULD_NOT_SYNC
def TerminateeNB(self):

Raphael Defosseux
committed
if self.eNB_serverId == '0':
lIpAddr = self.eNBIPAddress
lUserName = self.eNBUserName
lPassWord = self.eNBPassword
lSourcePath = self.eNBSourceCodePath
elif self.eNB_serverId == '1':
lIpAddr = self.eNB1IPAddress
lUserName = self.eNB1UserName
lPassWord = self.eNB1Password
lSourcePath = self.eNB1SourceCodePath
elif self.eNB_serverId == '2':
lIpAddr = self.eNB2IPAddress
lUserName = self.eNB2UserName
lPassWord = self.eNB2Password
lSourcePath = self.eNB2SourceCodePath
if lIpAddr == '' or lUserName == '' or lPassWord == '' or lSourcePath == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(lIpAddr, lUserName, lPassWord)
self.command('cd ' + lSourcePath + '/cmake_targets', '\$', 5)
self.command('stdbuf -o0 ps -aux | grep --color=never softmodem | grep -v grep', '\$', 5)
result = re.search('lte-softmodem', str(self.ssh.before))
if result is not None:

Raphael Defosseux
committed
self.command('echo ' + lPassWord + ' | sudo -S daemon --name=enb' + str(self.eNB_instance) + '_daemon --stop', '\$', 5)
self.command('echo ' + lPassWord + ' | sudo -S killall --signal SIGINT lte-softmodem || true', '\$', 5)
self.command('stdbuf -o0 ps -aux | grep --color=never softmodem | grep -v grep', '\$', 5)
result = re.search('lte-softmodem', str(self.ssh.before))
if result is not None:

Raphael Defosseux
committed
self.command('echo ' + lPassWord + ' | sudo -S killall --signal SIGKILL lte-softmodem || true', '\$', 5)
self.command('rm -f my-lte-softmodem-run' + str(self.eNB_instance) + '.sh', '\$', 5)
# If tracer options is on, stopping tshark on EPC side
result = re.search('T_stdout', str(self.Initialize_eNB_args))
if result is not None:
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
logging.debug('\u001B[1m Stopping tshark \u001B[0m')
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL tshark', '\$', 5)
time.sleep(1)
if self.EPC_PcapFileName != '':
self.command('echo ' + self.EPCPassword + ' | sudo -S chmod 666 /tmp/' + self.EPC_PcapFileName, '\$', 5)
self.copyin(self.EPCIPAddress, self.EPCUserName, self.EPCPassword, '/tmp/' + self.EPC_PcapFileName, '.')

Raphael Defosseux
committed
self.copyout(lIpAddr, lUserName, lPassWord, self.EPC_PcapFileName, lSourcePath + '/cmake_targets/.')
logging.debug('\u001B[1m Replaying RAW record file\u001B[0m')

Raphael Defosseux
committed
self.open(lIpAddr, lUserName, lPassWord)
self.command('cd ' + lSourcePath + '/common/utils/T/tracer/', '\$', 5)
enbLogFile = self.eNBLogFiles[int(self.eNB_instance)]
raw_record_file = enbLogFile.replace('.log', '_record.raw')
replay_log_file = enbLogFile.replace('.log', '_replay.log')
extracted_txt_file = enbLogFile.replace('.log', '_extracted_messages.txt')
extracted_log_file = enbLogFile.replace('.log', '_extracted_messages.log')
self.command('./extract_config -i ' + lSourcePath + '/cmake_targets/' + raw_record_file + ' > ' + lSourcePath + '/cmake_targets/' + extracted_txt_file, '\$', 5)
self.command('echo $USER; nohup ./replay -i ' + lSourcePath + '/cmake_targets/' + raw_record_file + ' > ' + lSourcePath + '/cmake_targets/' + replay_log_file + ' 2>&1 &', lUserName, 5)
self.command('./textlog -d ' + lSourcePath + '/cmake_targets/' + extracted_txt_file + ' -no-gui -ON -full > ' + lSourcePath + '/cmake_targets/' + extracted_log_file, '\$', 5)

Raphael Defosseux
committed
self.copyin(lIpAddr, lUserName, lPassWord, lSourcePath + '/cmake_targets/' + extracted_log_file, '.')
logging.debug('\u001B[1m Analyzing eNB replay logfile \u001B[0m')
logStatus = self.AnalyzeLogFile_eNB(extracted_log_file)
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)

Raphael Defosseux
committed
self.eNBLogFiles[int(self.eNB_instance)] = ''

Raphael Defosseux
committed
if self.eNBLogFiles[int(self.eNB_instance)] != '':

Raphael Defosseux
committed
fileToAnalyze = self.eNBLogFiles[int(self.eNB_instance)]
self.eNBLogFiles[int(self.eNB_instance)] = ''

Raphael Defosseux
committed
copyin_res = self.copyin(lIpAddr, lUserName, lPassWord, lSourcePath + '/cmake_targets/' + fileToAnalyze, '.')

Raphael Defosseux
committed
if (copyin_res == -1):
logging.debug('\u001B[1;37;41m Could not copy eNB logfile to analyze it! \u001B[0m')
self.htmleNBFailureMsg = 'Could not copy eNB logfile to analyze it!'
self.CreateHtmlTestRow('N/A', 'KO', ENB_PROCESS_NOLOGFILE_TO_ANALYZE)
return
if self.eNB_serverId != '0':
self.copyout(self.eNBIPAddress, self.eNBUserName, self.eNBPassword, './' + fileToAnalyze, self.eNBSourceCodePath + '/cmake_targets/')
logging.debug('\u001B[1m Analyzing eNB logfile \u001B[0m ' + fileToAnalyze)
logStatus = self.AnalyzeLogFile_eNB(fileToAnalyze)
if (logStatus < 0):
self.CreateHtmlTestRow('N/A', 'KO', logStatus)
self.preamtureExit = True
return
else:
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
else:
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateHSS(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT oai_hss || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep hss | grep -v grep', '\$', 5)
result = re.search('oai_hss -j', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL oai_hss || true', '\$', 5)
self.command('rm -f ' + self.EPCSourceCodePath + '/scripts/my-hss.sh', '\$', 5)
elif re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_hss oai_hss || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep hss | grep -v grep', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_hss oai_hss || true', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S daemon --name=simulated_hss --stop', '\$', 5)
time.sleep(1)
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL hss_sim', '\$', 5)
else:
logging.error('This should not happen!')
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateMME(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI', self.EPCType, re.IGNORECASE) or re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_mme mme || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep mme | grep -v grep', '\$', 5)
result = re.search('mme -c', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_mme mme || true', '\$', 5)
self.command('rm -f ' + self.EPCSourceCodePath + '/scripts/my-mme.sh', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./stop_mme', '\$', 5)
else:
logging.error('This should not happen!')
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateSPGW(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
if re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT spgwc spgwu || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep spgw | grep -v grep', '\$', 5)
result = re.search('spgwc -c |spgwu -c ', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL spgwc spgwu || true', '\$', 5)
self.command('rm -f ' + self.EPCSourceCodePath + '/scripts/my-spgw*.sh', '\$', 5)
self.command('stdbuf -o0 ps -aux | grep tshark | grep -v grep', '\$', 5)
result = re.search('-w ', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT tshark || true', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S chmod 666 ' + self.EPCSourceCodePath + '/scripts/*.pcap', '\$', 5)
elif re.match('OAI', self.EPCType, re.IGNORECASE):
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGINT run_spgw spgw || true', '\$', 5)
time.sleep(2)
self.command('stdbuf -o0 ps -aux | grep spgw | grep -v grep', '\$', 5)
result = re.search('\/bin\/bash .\/run_', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL run_spgw spgw || true', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cd /opt/ltebox/tools', '\$', 5)
self.command('echo ' + self.EPCPassword + ' | sudo -S ./stop_xGw', '\$', 5)
else:
logging.error('This should not happen!')
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateFlexranCtrl(self):
if self.flexranCtrlInstalled == False or self.flexranCtrlStarted == False:
return
if self.EPCIPAddress == '' or self.EPCUserName == '' or self.EPCPassword == '':
Usage()
sys.exit('Insufficient Parameter')
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('echo ' + self.EPCPassword + ' | sudo -S daemon --name=flexran_rtc_daemon --stop', '\$', 5)
time.sleep(1)
self.command('echo ' + self.EPCPassword + ' | sudo -S killall --signal SIGKILL rt_controller', '\$', 5)
time.sleep(1)
self.close()
self.flexranCtrlStarted = False
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
def TerminateUE_common(self, device_id, idx):
try:
self.open(self.ADBIPAddress, self.ADBUserName, self.ADBPassword)

Raphael Defosseux
committed
# back in airplane mode on (ie radio off)
if self.ADBCentralized:
if device_id == '84B7N16418004022':
self.command('stdbuf -o0 adb -s ' + device_id + ' shell "su - root -c /data/local/tmp/off"', '\$', 60)
else:
self.command('stdbuf -o0 adb -s ' + device_id + ' shell /data/local/tmp/off', '\$', 60)
else:
self.command('ssh ' + self.UEDevicesRemoteUser[idx] + '@' + self.UEDevicesRemoteServer[idx] + ' ' + self.UEDevicesOffCmd[idx], '\$', 60)
logging.debug('\u001B[1mUE (' + device_id + ') Detach Completed\u001B[0m')
if self.ADBCentralized:
self.command('stdbuf -o0 adb -s ' + device_id + ' shell "ps | grep --color=never iperf | grep -v grep"', '\$', 5)
else:
self.command('ssh ' + self.UEDevicesRemoteUser[idx] + '@' + self.UEDevicesRemoteServer[idx] + ' \'adb -s ' + device_id + ' shell "ps | grep --color=never iperf | grep -v grep"\'', '\$', 60)
result = re.search('shell +(?P<pid>\d+)', str(self.ssh.before))
if result is not None:
pid_iperf = result.group('pid')
if self.ADBCentralized:
self.command('stdbuf -o0 adb -s ' + device_id + ' shell "kill -KILL ' + pid_iperf + '"', '\$', 5)
else:
self.command('ssh ' + self.UEDevicesRemoteUser[idx] + '@' + self.UEDevicesRemoteServer[idx] + ' \'adb -s ' + device_id + ' shell "kill -KILL ' + pid_iperf + '"\'', '\$', 60)
self.close()
except:
os.kill(os.getppid(),signal.SIGUSR1)
def TerminateUE(self):
terminate_ue_flag = True
self.GetAllUEDevices(terminate_ue_flag)
for device_id in self.UEDevices:
p = Process(target= SSH.TerminateUE_common, args = (device_id,i,))
p.daemon = True
p.start()
multi_jobs.append(p)
for job in multi_jobs:
job.join()
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)

Boris Djalal
committed
def TerminateOAIUE(self):
self.open(self.UEIPAddress, self.UEUserName, self.UEPassword)
self.command('cd ' + self.UESourceCodePath + '/cmake_targets', '\$', 5)
self.command('ps -aux | grep --color=never softmodem | grep -v grep', '\$', 5)

Boris Djalal
committed
result = re.search('lte-uesoftmodem', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.UEPassword + ' | sudo -S daemon --name=ue' + str(self.UE_instance) + '_daemon --stop', '\$', 5)
self.command('echo ' + self.UEPassword + ' | sudo -S killall --signal SIGINT lte-uesoftmodem || true', '\$', 5)
self.command('ps -aux | grep --color=never softmodem | grep -v grep', '\$', 5)
result = re.search('lte-uesoftmodem', str(self.ssh.before))
if result is not None:
self.command('echo ' + self.UEPassword + ' | sudo -S killall --signal SIGKILL lte-uesoftmodem || true', '\$', 5)
self.command('rm -f my-lte-uesoftmodem-run' + str(self.UE_instance) + '.sh', '\$', 5)

Boris Djalal
committed
self.close()
result = re.search('ue_', str(self.UELogFile))

Boris Djalal
committed
if result is not None:
copyin_res = self.copyin(self.UEIPAddress, self.UEUserName, self.UEPassword, self.UESourceCodePath + '/cmake_targets/' + self.UELogFile, '.')
if (copyin_res == -1):
logging.debug('\u001B[1;37;41m Could not copy UE logfile to analyze it! \u001B[0m')
self.htmlUEFailureMsg = 'Could not copy UE logfile to analyze it!'
self.CreateHtmlTestRow('N/A', 'KO', OAI_UE_PROCESS_NOLOGFILE_TO_ANALYZE, 'UE')

Boris Djalal
committed
self.UELogFile = ''
return
logging.debug('\u001B[1m Analyzing UE logfile \u001B[0m')
logStatus = self.AnalyzeLogFile_UE(self.UELogFile)
result = re.search('--no-L2-connect', str(self.Initialize_OAI_UE_args))
if result is not None:
ueAction = 'Sniffing'

Boris Djalal
committed
else:
ueAction = 'Connection'
if (logStatus < 0):
logging.debug('\u001B[1m' + ueAction + ' Failed \u001B[0m')
self.htmlUEFailureMsg = '<b>' + ueAction + ' Failed</b>\n' + self.htmlUEFailureMsg
self.CreateHtmlTestRow('N/A', 'KO', logStatus, 'UE')
# In case of sniffing on commercial eNBs we have random results
# Not an error then
if (logStatus != OAI_UE_PROCESS_COULD_NOT_SYNC) or (ueAction != 'Sniffing'):
self.Initialize_OAI_UE_args = ''
self.AutoTerminateUEandeNB()
else:
logging.debug('\u001B[1m' + ueAction + ' Completed \u001B[0m')
self.htmlUEFailureMsg = '<b>' + ueAction + ' Completed</b>\n' + self.htmlUEFailureMsg
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)
self.UELogFile = ''
else:
self.CreateHtmlTestRow('N/A', 'OK', ALL_PROCESSES_OK)

Boris Djalal
committed
if (self.ADBIPAddress != 'none'):
self.testCase_id = 'AUTO-KILL-UE'
self.desc = 'Automatic Termination of UE'
self.ShowTestID()
self.TerminateUE()
if (self.Initialize_OAI_UE_args != ''):
self.testCase_id = 'AUTO-KILL-UE'
self.desc = 'Automatic Termination of UE'
self.ShowTestID()
self.TerminateOAIUE()
if (self.Initialize_eNB_args != ''):
self.testCase_id = 'AUTO-KILL-eNB'
self.desc = 'Automatic Termination of eNB'
self.ShowTestID()
self.eNB_instance = '0'
self.TerminateeNB()
if self.flexranCtrlInstalled and self.flexranCtrlStarted:
self.testCase_id = 'AUTO-KILL-flexran-ctl'
self.desc = 'Automatic Termination of FlexRan CTL'
self.ShowTestID()
self.TerminateFlexranCtrl()
self.prematureExit = True
def IdleSleep(self):
time.sleep(self.idle_sleep_time)
self.CreateHtmlTestRow(str(self.idle_sleep_time) + ' sec', 'OK', ALL_PROCESSES_OK)
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
def X2_Status(self, idx, fileName):
cmd = "curl --silent http://" + self.EPCIPAddress + ":9999/stats | jq '.' > " + fileName
message = cmd + '\n'
logging.debug(cmd)
subprocess.run(cmd, shell=True)
if idx == 0:
cmd = "jq '.mac_stats | length' " + fileName
strNbEnbs = subprocess.check_output(cmd, shell=True, universal_newlines=True)
self.x2NbENBs = int(strNbEnbs.strip())
cnt = 0
while cnt < self.x2NbENBs:
cmd = "jq '.mac_stats[" + str(cnt) + "].bs_id' " + fileName
bs_id = subprocess.check_output(cmd, shell=True, universal_newlines=True)
self.x2ENBBsIds[idx].append(bs_id.strip())
cmd = "jq '.mac_stats[" + str(cnt) + "].ue_mac_stats | length' " + fileName
stNbUEs = subprocess.check_output(cmd, shell=True, universal_newlines=True)
nbUEs = int(stNbUEs.strip())
ueIdx = 0
self.x2ENBConnectedUEs[idx].append([])
while ueIdx < nbUEs:
cmd = "jq '.mac_stats[" + str(cnt) + "].ue_mac_stats[" + str(ueIdx) + "].rnti' " + fileName
rnti = subprocess.check_output(cmd, shell=True, universal_newlines=True)
self.x2ENBConnectedUEs[idx][cnt].append(rnti.strip())
ueIdx += 1
cnt += 1
msg = "FlexRan Controller is connected to " + str(self.x2NbENBs) + " eNB(s)"
logging.debug(msg)
message += msg + '\n'
cnt = 0
while cnt < self.x2NbENBs:
msg = " -- eNB: " + str(self.x2ENBBsIds[idx][cnt]) + " is connected to " + str(len(self.x2ENBConnectedUEs[idx][cnt])) + " UE(s)"
logging.debug(msg)
message += msg + '\n'
ueIdx = 0
while ueIdx < len(self.x2ENBConnectedUEs[idx][cnt]):
msg = " -- UE rnti: " + str(self.x2ENBConnectedUEs[idx][cnt][ueIdx])
logging.debug(msg)
message += msg + '\n'
ueIdx += 1
cnt += 1
return message
def Perform_X2_Handover(self):
html_queue = SimpleQueue()
fullMessage = '<pre style="background-color:white">'
msg = 'Doing X2 Handover w/ option ' + self.x2_ho_options
logging.debug(msg)
fullMessage += msg + '\n'
if self.x2_ho_options == 'network':
if self.flexranCtrlInstalled and self.flexranCtrlStarted:
self.x2ENBBsIds = []
self.x2ENBConnectedUEs = []
self.x2ENBBsIds.append([])
self.x2ENBBsIds.append([])
self.x2ENBConnectedUEs.append([])
self.x2ENBConnectedUEs.append([])
fullMessage += self.X2_Status(0, self.testCase_id + '_pre_ho.json')
msg = "Activating the X2 Net control on each eNB"
logging.debug(msg)
fullMessage += msg + '\n'
eNB_cnt = self.x2NbENBs
cnt = 0
while cnt < eNB_cnt:
cmd = "curl -XPOST http://" + self.EPCIPAddress + ":9999/rrc/x2_ho_net_control/enb/" + str(self.x2ENBBsIds[0][cnt]) + "/1"
logging.debug(cmd)
fullMessage += cmd + '\n'
subprocess.run(cmd, shell=True)
cnt += 1
# Waiting for the activation to be active
time.sleep(10)
msg = "Switching UE(s) from eNB to eNB"
logging.debug(msg)
fullMessage += msg + '\n'
cnt = 0
while cnt < eNB_cnt:
ueIdx = 0
while ueIdx < len(self.x2ENBConnectedUEs[0][cnt]):
cmd = "curl -XPOST http://" + self.EPCIPAddress + ":9999/rrc/ho/senb/" + str(self.x2ENBBsIds[0][cnt]) + "/ue/" + str(self.x2ENBConnectedUEs[0][cnt][ueIdx]) + "/tenb/" + str(self.x2ENBBsIds[0][eNB_cnt - cnt - 1])
logging.debug(cmd)
fullMessage += cmd + '\n'
subprocess.run(cmd, shell=True)
ueIdx += 1
cnt += 1
time.sleep(10)
# check
logging.debug("Checking the Status after X2 Handover")
fullMessage += self.X2_Status(1, self.testCase_id + '_post_ho.json')
cnt = 0
x2Status = True
while cnt < eNB_cnt:
if len(self.x2ENBConnectedUEs[0][cnt]) == len(self.x2ENBConnectedUEs[1][cnt]):
x2Status = False
cnt += 1
if x2Status:
msg = "X2 Handover was successful"
logging.debug(msg)
fullMessage += msg + '</pre>'
html_queue.put(fullMessage)
self.CreateHtmlTestRowQueue('N/A', 'OK', len(self.UEDevices), html_queue)
else:
msg = "X2 Handover FAILED"
logging.error(msg)
fullMessage += msg + '</pre>'
html_queue.put(fullMessage)
self.CreateHtmlTestRowQueue('N/A', 'OK', len(self.UEDevices), html_queue)
else:
self.CreateHtmlTestRow('Cannot perform requested X2 Handover', 'KO', ALL_PROCESSES_OK)
def LogCollectBuild(self):
if (self.eNBIPAddress != '' and self.eNBUserName != '' and self.eNBPassword != ''):
IPAddress = self.eNBIPAddress
UserName = self.eNBUserName
Password = self.eNBPassword
SourceCodePath = self.eNBSourceCodePath
elif (self.UEIPAddress != '' and self.UEUserName != '' and self.UEPassword != ''):
IPAddress = self.UEIPAddress
UserName = self.UEUserName
Password = self.UEPassword
SourceCodePath = self.UESourceCodePath
self.open(IPAddress, UserName, Password)
self.command('cd ' + SourceCodePath, '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('zip build.log.zip build_log_*/*', '\$', 60)
self.close()
def LogCollecteNB(self):
self.open(self.eNBIPAddress, self.eNBUserName, self.eNBPassword)
self.command('cd ' + self.eNBSourceCodePath, '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm -f enb.log.zip', '\$', 5)
self.command('echo ' + self.eNBPassword + ' | sudo -S zip enb.log.zip enb*.log core* enb_*record.raw enb_*.pcap enb_*txt', '\$', 60)
self.command('echo ' + self.eNBPassword + ' | sudo -S rm enb*.log core* enb_*record.raw enb_*.pcap enb_*txt', '\$', 5)
def LogCollectPing(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('zip ping.log.zip ping*.log', '\$', 60)
self.command('rm ping*.log', '\$', 5)
self.close()
def LogCollectIperf(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath, '\$', 5)
self.command('cd scripts', '\$', 5)
self.command('zip iperf.log.zip iperf*.log', '\$', 60)
self.command('rm iperf*.log', '\$', 5)
self.close()
def LogCollectHSS(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE) or re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('zip hss.log.zip hss*.log', '\$', 60)
self.command('echo ' + self.EPCPassword + ' | sudo -S rm hss*.log', '\$', 5)
if re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('zip hss.log.zip logs/hss*.* *.pcap', '\$', 60)
self.command('echo ' + self.EPCPassword + ' | sudo -S rm -f logs/hss*.* *.pcap', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cp /opt/hss_sim0609/hss.log .', '\$', 60)
self.command('zip hss.log.zip hss.log', '\$', 60)
else:
logging.error('This option should not occur!')
def LogCollectMME(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE) or re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('zip mme.log.zip mme*.log', '\$', 60)
self.command('echo ' + self.EPCPassword + ' | sudo -S rm mme*.log', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cp /opt/ltebox/var/log/*Log.0 .', '\$', 5)
self.command('zip mme.log.zip mmeLog.0 s1apcLog.0 s1apsLog.0 s11cLog.0 libLog.0 s1apCodecLog.0', '\$', 60)
else:
logging.error('This option should not occur!')
def LogCollectSPGW(self):
self.open(self.EPCIPAddress, self.EPCUserName, self.EPCPassword)
self.command('cd ' + self.EPCSourceCodePath + '/scripts', '\$', 5)
if re.match('OAI', self.EPCType, re.IGNORECASE) or re.match('OAI-Rel14-CUPS', self.EPCType, re.IGNORECASE):
self.command('zip spgw.log.zip spgw*.log', '\$', 60)
self.command('echo ' + self.EPCPassword + ' | sudo -S rm spgw*.log', '\$', 5)
elif re.match('ltebox', self.EPCType, re.IGNORECASE):
self.command('cp /opt/ltebox/var/log/xGwLog.0 .', '\$', 5)
self.command('zip spgw.log.zip xGwLog.0', '\$', 60)
else:
logging.error('This option should not occur!')
def LogCollectOAIUE(self):
self.open(self.UEIPAddress, self.UEUserName, self.UEPassword)
self.command('cd ' + self.UESourceCodePath, '\$', 5)
self.command('cd cmake_targets', '\$', 5)
self.command('echo ' + self.UEPassword + ' | sudo -S rm -f ue.log.zip', '\$', 5)
self.command('echo ' + self.UEPassword + ' | sudo -S zip ue.log.zip ue*.log core* ue_*record.raw ue_*.pcap ue_*txt', '\$', 60)
self.command('echo ' + self.UEPassword + ' | sudo -S rm ue*.log core* ue_*record.raw ue_*.pcap ue_*txt', '\$', 5)
self.close()
def RetrieveSystemVersion(self, machine):
if self.eNBIPAddress == 'none' or self.UEIPAddress == 'none':
self.OsVersion = 'Ubuntu 16.04.5 LTS'
self.KernelVersion = '4.15.0-45-generic'
self.UhdVersion = '3.13.0.1-0'
self.UsrpBoard = 'B210'
self.CpuNb = '4'
self.CpuModel = 'Intel(R) Core(TM) i5-6200U'
self.CpuMHz = '2399.996 MHz'
return 0
if machine == 'eNB':
if self.eNBIPAddress != '' and self.eNBUserName != '' and self.eNBPassword != '':
IPAddress = self.eNBIPAddress
UserName = self.eNBUserName
Password = self.eNBPassword
else:
return -1
if machine == 'UE':
if self.UEIPAddress != '' and self.UEUserName != '' and self.UEPassword != '':
IPAddress = self.UEIPAddress
UserName = self.UEUserName
Password = self.UEPassword
else:
return -1
self.open(IPAddress, UserName, Password)
self.command('lsb_release -a', '\$', 5)
result = re.search('Description:\\\\t(?P<os_type>[a-zA-Z0-9\-\_\.\ ]+)', str(self.ssh.before))
if result is not None:
self.OsVersion = result.group('os_type')
logging.debug('OS is: ' + self.OsVersion)
self.command('uname -r', '\$', 5)
result = re.search('uname -r\\\\r\\\\n(?P<kernel_version>[a-zA-Z0-9\-\_\.]+)', str(self.ssh.before))
if result is not None:
self.KernelVersion = result.group('kernel_version')
logging.debug('Kernel Version is: ' + self.KernelVersion)

Raphael Defosseux
committed
self.command('dpkg --list | egrep --color=never libuhd003', '\$', 5)
result = re.search('libuhd003:amd64 *(?P<uhd_version>[0-9\.]+)', str(self.ssh.before))
if result is not None:
self.UhdVersion = result.group('uhd_version')
logging.debug('UHD Version is: ' + self.UhdVersion)
self.command('echo ' + Password + ' | sudo -S uhd_find_devices', '\$', 15)
result = re.search('product: (?P<usrp_board>[0-9A-Za-z]+)\\\\r\\\\n', str(self.ssh.before))
if result is not None:
self.UsrpBoard = result.group('usrp_board')
logging.debug('USRP Board is: ' + self.UsrpBoard)
self.command('lscpu', '\$', 5)
result = re.search('CPU\(s\): *(?P<nb_cpus>[0-9]+).*Model name: *(?P<model>[a-zA-Z0-9\-\_\.\ \(\)]+).*CPU MHz: *(?P<cpu_mhz>[0-9\.]+)', str(self.ssh.before))
if result is not None:
self.CpuNb = result.group('nb_cpus')
logging.debug('nb_cpus: ' + self.CpuNb)
self.CpuModel = result.group('model')
logging.debug('model: ' + self.CpuModel)
self.CpuMHz = result.group('cpu_mhz') + ' MHz'
logging.debug('cpu_mhz: ' + self.CpuMHz)
#-----------------------------------------------------------
# HTML Reporting....
#-----------------------------------------------------------
def CreateHtmlHeader(self):
if (not self.htmlHeaderCreated):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')