Newer
Older
Gabriele Perrone
committed
result = re.search('Operating System: (?P<os_type>[a-zA-Z0-9\-\_\.\ ]+)', SSH.getBefore())
OsVersion = result.group('os_type')
if OsVersion == 'CentOS Linux 7 ':
Gabriele Perrone
committed
SSH.command('cat /etc/redhat-release', '\$', 5)
result = re.search('CentOS Linux release (?P<os_version>[0-9\.]+)', SSH.getBefore())
OsVersion = OsVersion.replace('7 ', result.group('os_version'))
logging.debug('OS is: ' + OsVersion)
hardy
committed
HTML.OsVersion[idx]=OsVersion
Gabriele Perrone
committed
SSH.command('uname -r', '\$', 5)
result = re.search('uname -r\\\\r\\\\n(?P<kernel_version>[a-zA-Z0-9\-\_\.]+)', SSH.getBefore())
if result is not None:
KernelVersion = result.group('kernel_version')
logging.debug('Kernel Version is: ' + KernelVersion)
hardy
committed
HTML.KernelVersion[idx]=KernelVersion
Gabriele Perrone
committed
SSH.command('dpkg --list | egrep --color=never libuhd003', '\$', 5)
result = re.search('libuhd003:amd64 *(?P<uhd_version>[0-9\.]+)', SSH.getBefore())
if result is not None:
UhdVersion = result.group('uhd_version')
logging.debug('UHD Version is: ' + UhdVersion)
hardy
committed
HTML.UhdVersion[idx]=UhdVersion
Gabriele Perrone
committed
SSH.command('uhd_config_info --version', '\$', 5)
result = re.search('UHD (?P<uhd_version>[a-zA-Z0-9\.\-]+)', SSH.getBefore())
UhdVersion = result.group('uhd_version')
logging.debug('UHD Version is: ' + UhdVersion)
hardy
committed
HTML.UhdVersion[idx]=UhdVersion
Gabriele Perrone
committed
SSH.command('echo ' + Password + ' | sudo -S uhd_find_devices', '\$', 60)
Gabriele Perrone
committed
usrp_boards = re.findall('product: ([0-9A-Za-z]+)\\\\r\\\\n', SSH.getBefore())
count = 0
for board in usrp_boards:
if count == 0:
logging.debug('USRP Board(s) : ' + UsrpBoard)
hardy
committed
HTML.UsrpBoard[idx]=UsrpBoard
Gabriele Perrone
committed
SSH.command('lscpu', '\$', 5)
result = re.search('CPU\(s\): *(?P<nb_cpus>[0-9]+).*Model name: *(?P<model>[a-zA-Z0-9\-\_\.\ \(\)]+).*CPU MHz: *(?P<cpu_mhz>[0-9\.]+)', SSH.getBefore())
if result is not None:
CpuNb = result.group('nb_cpus')
logging.debug('nb_cpus: ' + CpuNb)
hardy
committed
HTML.CpuNb[idx]=CpuNb
CpuModel = result.group('model')
logging.debug('model: ' + CpuModel)
hardy
committed
HTML.CpuModel[idx]=CpuModel
CpuMHz = result.group('cpu_mhz') + ' MHz'
logging.debug('cpu_mhz: ' + CpuMHz)
hardy
committed
HTML.CpuMHz[idx]=CpuMHz
Gabriele Perrone
committed
SSH.close()
#-----------------------------------------------------------
# ShowTestID()
#-----------------------------------------------------------
def ShowTestID(self):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
logging.debug('\u001B[1mTest ID:' + self.testCase_id + '\u001B[0m')
logging.debug('\u001B[1m' + self.desc + '\u001B[0m')
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
def CheckClassValidity(action,id):
if action!='Build_PhySim' and action!='Run_PhySim' and action != 'Build_eNB' and action != 'WaitEndBuild_eNB' and action != 'Initialize_eNB' and action != 'Terminate_eNB' and action != 'Initialize_UE' and action != 'Terminate_UE' and action != 'Attach_UE' and action != 'Detach_UE' and action != 'Build_OAI_UE' and action != 'Initialize_OAI_UE' and action != 'Terminate_OAI_UE' and action != 'DataDisable_UE' and action != 'DataEnable_UE' and action != 'CheckStatusUE' and action != 'Ping' and action != 'Iperf' and action != 'Reboot_UE' and action != 'Initialize_FlexranCtrl' and action != 'Terminate_FlexranCtrl' and action != 'Initialize_HSS' and action != 'Terminate_HSS' and action != 'Initialize_MME' and action != 'Terminate_MME' and action != 'Initialize_SPGW' and action != 'Terminate_SPGW' and action != 'Initialize_CatM_module' and action != 'Terminate_CatM_module' and action != 'Attach_CatM_module' and action != 'Detach_CatM_module' and action != 'Ping_CatM_module' and action != 'IdleSleep' and action != 'Perform_X2_Handover':
logging.debug('ERROR: test-case ' + id + ' has wrong class ' + action)
return False
return True
def GetParametersFromXML(action):
if action == 'Build_eNB':
RAN.Build_eNB_args=test.findtext('Build_eNB_args')
forced_workspace_cleanup = test.findtext('forced_workspace_cleanup')
if (forced_workspace_cleanup is None):
RAN.Build_eNB_forced_workspace_cleanup=False
else:
if re.match('true', forced_workspace_cleanup, re.IGNORECASE):
RAN.Build_eNB_forced_workspace_cleanup=True
Gabriele Perrone
committed
else:
RAN.Build_eNB_forced_workspace_cleanup=False
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
xmlBgBuildField = test.findtext('backgroundBuild')
if (xmlBgBuildField is None):
else:
if re.match('true', xmlBgBuildField, re.IGNORECASE):
if action == 'WaitEndBuild_eNB':
RAN.Build_eNB_args=test.findtext('Build_eNB_args')
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
if action == 'Initialize_eNB':
RAN.Initialize_eNB_args=test.findtext('Initialize_eNB_args')
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = air_interface.lower() +'ocp-enb'
RAN.air_interface(CiTestObj.air_interface)
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = air_interface.lower() +'ocp-enb'
if action == 'Attach_UE':
nbMaxUEtoAttach = test.findtext('nbMaxUEtoAttach')
if (nbMaxUEtoAttach is None):
CiTestObj.nbMaxUEtoAttach = -1
CiTestObj.nbMaxUEtoAttach = int(nbMaxUEtoAttach)
if action == 'CheckStatusUE':
expectedNBUE = test.findtext('expectedNbOfConnectedUEs')
if (expectedNBUE is None):
CiTestObj.expectedNbOfConnectedUEs = -1
CiTestObj.expectedNbOfConnectedUEs = int(expectedNBUE)
CiTestObj.Build_OAI_UE_args = test.findtext('Build_OAI_UE_args')
Gabriele Perrone
committed
CiTestObj.clean_repository = test.findtext('clean_repository')
if (CiTestObj.clean_repository == 'false'):
CiTestObj.clean_repository = False
Gabriele Perrone
committed
CiTestObj.clean_repository = True
CiTestObj.Initialize_OAI_UE_args = test.findtext('Initialize_OAI_UE_args')
CiTestObj.UE_instance = test.findtext('UE_instance')
if (CiTestObj.UE_instance is None):
CiTestObj.UE_instance = '0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = air_interface.lower() +'ocp-enb'
RAN.eNB_instance=test.findtext('UE_instance')
if (CiTestObj.UE_instance is None):
CiTestObj.UE_instance = '0'
if action == 'Ping' or action == 'Ping_CatM_module':
CiTestObj.ping_args = test.findtext('ping_args')
CiTestObj.ping_packetloss_threshold = test.findtext('ping_packetloss_threshold')
if action == 'Iperf':
CiTestObj.iperf_args = test.findtext('iperf_args')
CiTestObj.iperf_packetloss_threshold = test.findtext('iperf_packetloss_threshold')
CiTestObj.iperf_profile = test.findtext('iperf_profile')
if (CiTestObj.iperf_profile is None):
CiTestObj.iperf_profile = 'balanced'
if CiTestObj.iperf_profile != 'balanced' and CiTestObj.iperf_profile != 'unbalanced' and CiTestObj.iperf_profile != 'single-ue':
logging.debug('ERROR: test-case has wrong profile ' + CiTestObj.iperf_profile)
CiTestObj.iperf_profile = 'balanced'
Gabriele Perrone
committed
CiTestObj.iperf_options = test.findtext('iperf_options')
if (CiTestObj.iperf_options is None):
CiTestObj.iperf_options = 'check'
Gabriele Perrone
committed
if CiTestObj.iperf_options != 'check' and CiTestObj.iperf_options != 'sink':
logging.debug('ERROR: test-case has wrong option ' + CiTestObj.iperf_options)
CiTestObj.iperf_options = 'check'
if action == 'IdleSleep':
string_field = test.findtext('idle_sleep_time_in_sec')
if (string_field is None):
CiTestObj.idle_sleep_time = 5
CiTestObj.idle_sleep_time = int(string_field)
if action == 'Perform_X2_Handover':
string_field = test.findtext('x2_ho_options')
if (string_field is None):
CiTestObj.x2_ho_options = 'network'
else:
if string_field != 'network':
logging.error('ERROR: test-case has wrong option ' + string_field)
CiTestObj.x2_ho_options = 'network'
CiTestObj.x2_ho_options = string_field
if action == 'Build_PhySim':
ldpc.buildargs = test.findtext('physim_build_args')
forced_workspace_cleanup = test.findtext('forced_workspace_cleanup')
if (forced_workspace_cleanup is None):
ldpc.forced_workspace_cleanup=False
else:
if re.match('true', forced_workspace_cleanup, re.IGNORECASE):
ldpc.forced_workspace_cleanup=True
else:
ldpc.forced_workspace_cleanup=False
if action == 'Run_PhySim':
ldpc.runargs = test.findtext('physim_run_args')
if action == 'COTS_UE_Airplane':
COTS_UE=CotsUe('oppo', self.UEIPAddress, self.UEUserName,UEPassword)
COTS_UE.runargs = test.findtext('cots_ue_airplane_args')
#check if given test is in list
#it is in list if one of the strings in 'list' is at the beginning of 'test'
def test_in_list(test, list):
for check in list:
check=check.replace('+','')
if (test.startswith(check)):
return True
return False
def receive_signal(signum, frame):
sys.exit(1)
#-----------------------------------------------------------
# Parameter Check
#-----------------------------------------------------------
mode = ''
Gabriele Perrone
committed
import sshconnection
import epc
Gabriele Perrone
committed
import ran
import html
import constants
SSH = sshconnection.SSHConnection()
EPC = epc.EPCManagement()
RAN = ran.RANManagement()
HTML = html.HTMLManagement()
hardy
committed
EPC.HtmlObj=HTML
import cls_physim #class PhySim for physical simulators build and test
ldpc=cls_physim.PhySim() #create an instance for LDPC test using GPU or CPU build
argvs = sys.argv
argc = len(argvs)
cwd = os.getcwd()
while len(argvs) > 1:
myArgv = argvs.pop(1) # 0th is this file's name
Gabriele Perrone
committed
if re.match('^\-\-help$', myArgv, re.IGNORECASE):
sys.exit(0)
elif re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE)
mode = matchReg.group(1)
elif re.match('^\-\-eNBRepository=(.+)$|^\-\-ranRepository(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE)
else:
matchReg = re.match('^\-\-ranRepository=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.ranRepository = matchReg.group(1)
hardy
committed
HTML.ranRepository=matchReg.group(1)
ldpc.ranRepository=matchReg.group(1)
elif re.match('^\-\-eNB_AllowMerge=(.+)$|^\-\-ranAllowMerge=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE)
else:
matchReg = re.match('^\-\-ranAllowMerge=(.+)$', myArgv, re.IGNORECASE)
ldpc.ranAllowMerge=matchReg.group(1)
hardy
committed
HTML.ranAllowMerge=True
elif re.match('^\-\-eNBBranch=(.+)$|^\-\-ranBranch=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE)
else:
matchReg = re.match('^\-\-ranBranch=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.ranBranch = matchReg.group(1)
hardy
committed
HTML.ranBranch=matchReg.group(1)
ldpc.ranBranch=matchReg.group(1)
elif re.match('^\-\-eNBCommitID=(.*)$|^\-\-ranCommitID=(.*)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBCommitID=(.*)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBCommitID=(.*)$', myArgv, re.IGNORECASE)
else:
matchReg = re.match('^\-\-ranCommitID=(.*)$', myArgv, re.IGNORECASE)
CiTestObj.ranCommitID = matchReg.group(1)
hardy
committed
HTML.ranCommitID=matchReg.group(1)
ldpc.ranCommitID=matchReg.group(1)
elif re.match('^\-\-eNBTargetBranch=(.*)$|^\-\-ranTargetBranch=(.*)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBTargetBranch=(.*)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBTargetBranch=(.*)$', myArgv, re.IGNORECASE)
else:
matchReg = re.match('^\-\-ranTargetBranch=(.*)$', myArgv, re.IGNORECASE)
CiTestObj.ranTargetBranch = matchReg.group(1)
hardy
committed
HTML.ranTargetBranch=matchReg.group(1)
ldpc.ranTargetBranch=matchReg.group(1)
elif re.match('^\-\-eNBIPAddress=(.+)$|^\-\-eNB[1-2]IPAddress=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE)
ldpc.eNBIpAddr=matchReg.group(1)
elif re.match('^\-\-eNB1IPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB1IPAddress=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNB2IPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB2IPAddress=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNBUserName=(.+)$|^\-\-eNB[1-2]UserName=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBUserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBUserName=(.+)$', myArgv, re.IGNORECASE)
ldpc.eNBUserName=matchReg.group(1)
elif re.match('^\-\-eNB1UserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB1UserName=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNB2UserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB2UserName=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNBPassword=(.+)$|^\-\-eNB[1-2]Password=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBPassword=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBPassword=(.+)$', myArgv, re.IGNORECASE)
ldpc.eNBPassWord=matchReg.group(1)
elif re.match('^\-\-eNB1Password=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB1Password=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNB2Password=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB2Password=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNBSourceCodePath=(.+)$|^\-\-eNB[1-2]SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
if re.match('^\-\-eNBSourceCodePath=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNBSourceCodePath=(.+)$', myArgv, re.IGNORECASE)
ldpc.eNBSourceCodePath=matchReg.group(1)
elif re.match('^\-\-eNB1SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB1SourceCodePath=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-eNB2SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-eNB2SourceCodePath=(.+)$', myArgv, re.IGNORECASE)
elif re.match('^\-\-EPCIPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCIPAddress=(.+)$', myArgv, re.IGNORECASE)
hardy
committed
EPC.IPAddress=matchReg.group(1)
elif re.match('^\-\-EPCUserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCUserName=(.+)$', myArgv, re.IGNORECASE)
hardy
committed
EPC.UserName=matchReg.group(1)
elif re.match('^\-\-EPCPassword=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCPassword=(.+)$', myArgv, re.IGNORECASE)
hardy
committed
EPC.Password=matchReg.group(1)
elif re.match('^\-\-EPCSourceCodePath=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCSourceCodePath=(.+)$', myArgv, re.IGNORECASE)
hardy
committed
EPC.SourceCodePath=matchReg.group(1)
elif re.match('^\-\-EPCType=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCType=(.+)$', myArgv, re.IGNORECASE)
if re.match('OAI', matchReg.group(1), re.IGNORECASE) or re.match('ltebox', matchReg.group(1), re.IGNORECASE) or re.match('OAI-Rel14-CUPS', matchReg.group(1), re.IGNORECASE) or re.match('OAI-Rel14-Docker', matchReg.group(1), re.IGNORECASE):
hardy
committed
EPC.Type=matchReg.group(1)
sys.exit('Invalid EPC Type: ' + matchReg.group(1) + ' -- (should be OAI or ltebox or OAI-Rel14-CUPS or OAI-Rel14-Docker)')
elif re.match('^\-\-EPCContainerPrefix=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-EPCContainerPrefix=(.+)$', myArgv, re.IGNORECASE)
hardy
committed
EPC.ContainerPrefix=matchReg.group(1)
elif re.match('^\-\-ADBIPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-ADBIPAddress=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.ADBIPAddress = matchReg.group(1)
elif re.match('^\-\-ADBUserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-ADBUserName=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.ADBUserName = matchReg.group(1)
elif re.match('^\-\-ADBType=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-ADBType=(.+)$', myArgv, re.IGNORECASE)
if re.match('centralized', matchReg.group(1), re.IGNORECASE) or re.match('distributed', matchReg.group(1), re.IGNORECASE):
if re.match('distributed', matchReg.group(1), re.IGNORECASE):
CiTestObj.ADBCentralized = False
CiTestObj.ADBCentralized = True
else:
sys.exit('Invalid ADB Type: ' + matchReg.group(1) + ' -- (should be centralized or distributed)')
elif re.match('^\-\-ADBPassword=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-ADBPassword=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.ADBPassword = matchReg.group(1)
elif re.match('^\-\-XMLTestFile=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-XMLTestFile=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.testXMLfiles.append(matchReg.group(1))
hardy
committed
HTML.testXMLfiles=matchReg.group(1)
HTML.nbTestXMLfiles=HTML.nbTestXMLfiles+1
elif re.match('^\-\-UEIPAddress=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-UEIPAddress=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.UEIPAddress = matchReg.group(1)
elif re.match('^\-\-UEUserName=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-UEUserName=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.UEUserName = matchReg.group(1)
elif re.match('^\-\-UEPassword=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-UEPassword=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.UEPassword = matchReg.group(1)

Boris Djalal
committed
elif re.match('^\-\-UESourceCodePath=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-UESourceCodePath=(.+)$', myArgv, re.IGNORECASE)
CiTestObj.UESourceCodePath = matchReg.group(1)
elif re.match('^\-\-finalStatus=(.+)$', myArgv, re.IGNORECASE):
matchReg = re.match('^\-\-finalStatus=(.+)$', myArgv, re.IGNORECASE)
finalStatus = matchReg.group(1)
if ((finalStatus == 'true') or (finalStatus == 'True')):
CiTestObj.finalStatus = True
sys.exit('Invalid Parameter: ' + myArgv)
if re.match('^TerminateeNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '':
sys.exit('Insufficient Parameter')
RAN.eNB_serverId='0'
RAN.eNB_instance='0'
RAN.eNBSourceCodePath='/tmp/'
elif re.match('^TerminateUE$', mode, re.IGNORECASE):
if (CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == ''):
sys.exit('Insufficient Parameter')
signal.signal(signal.SIGUSR1, receive_signal)
elif re.match('^TerminateOAIUE$', mode, re.IGNORECASE):
if CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '':
sys.exit('Insufficient Parameter')
signal.signal(signal.SIGUSR1, receive_signal)
elif re.match('^TerminateHSS$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^TerminateMME$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^TerminateSPGW$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath== '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectBuild$', mode, re.IGNORECASE):
if (RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '') and (CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == ''):
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectBuild()
elif re.match('^LogCollecteNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectHSS$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectMME$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectSPGW$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectPing$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectIperf$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectIperf()
elif re.match('^LogCollectOAIUE$', mode, re.IGNORECASE):
if CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == '':
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectOAIUE()
elif re.match('^InitiateHtml$', mode, re.IGNORECASE):
if (CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == ''):
sys.exit('Insufficient Parameter')
count = 0
while (count < HTML.nbTestXMLfiles):
Gabriele Perrone
committed
#xml_test_file = cwd + "/" + CiTestObj.testXMLfiles[count]
xml_test_file = sys.path[0] + "/" + CiTestObj.testXMLfiles[count]
if (os.path.isfile(xml_test_file)):
try:
xmlTree = ET.parse(xml_test_file)
except:
print("Error while parsing file: " + xml_test_file)
xmlRoot = xmlTree.getroot()
hardy
committed
HTML.htmlTabRefs.append(xmlRoot.findtext('htmlTabRef',default='test-tab-' + str(count)))
HTML.htmlTabNames.append(xmlRoot.findtext('htmlTabName',default='test-tab-' + str(count)))
HTML.htmlTabIcons.append(xmlRoot.findtext('htmlTabIcon',default='info-sign'))
hardy
committed
if foundCount != HTML.nbTestXMLfiles:
HTML.nbTestXMLfiles=foundCount
Gabriele Perrone
committed
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
CiTestObj.GetAllCatMDevices(terminate_ue_flag)
HTML.SethtmlUEConnected(len(CiTestObj.UEDevices) + len(CiTestObj.CatMDevices))
hardy
committed
HTML.htmlNb_Smartphones=len(CiTestObj.UEDevices)
HTML.htmlNb_CATM_Modules=len(CiTestObj.CatMDevices)
Gabriele Perrone
committed
HTML.CreateHtmlHeader(CiTestObj.ADBIPAddress)
elif re.match('^FinalizeHtml$', mode, re.IGNORECASE):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
logging.debug('\u001B[1m Creating HTML footer \u001B[0m')
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
CiTestObj.RetrieveSystemVersion('eNB')
CiTestObj.RetrieveSystemVersion('UE')
Gabriele Perrone
committed
HTML.CreateHtmlFooter(CiTestObj.finalStatus)

Boris Djalal
committed
elif re.match('^TesteNB$', mode, re.IGNORECASE) or re.match('^TestUE$', mode, re.IGNORECASE):
if re.match('^TesteNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.ranRepository == '' or RAN.ranBranch == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '' or EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '' or CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == '':
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '' or EPC.Type == '':
HELP.EPCSrvHelp(EPC.IPAddress, EPC.UserName, EPC.Password, EPC.SourceCodePath, EPC.Type)
if RAN.ranRepository == '':
HELP.GitSrvHelp(RAN.ranRepository, RAN.ranBranch, RAN.ranCommitID, RAN.ranAllowMerge, RAN.ranTargetBranch)
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '':
HELP.eNBSrvHelp(RAN.eNBIPAddress, RAN.eNBUserName, RAN.eNBPassword, RAN.eNBSourceCodePath)

Boris Djalal
committed
sys.exit('Insufficient Parameter')
hardy
committed
if (EPC.IPAddress!= '') and (EPC.IPAddress != 'none'):
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, cwd + "/tcp_iperf_stats.awk", "/tmp")
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, cwd + "/active_net_interfaces.awk", "/tmp")

Boris Djalal
committed
else:
if CiTestObj.UEIPAddress == '' or CiTestObj.ranRepository == '' or CiTestObj.ranBranch == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == '':
HELP.GenericHelp(CONST.Version)

Boris Djalal
committed
sys.exit('UE: Insufficient Parameter')
#read test_case_list.xml file
# if no parameters for XML file, use default value
hardy
committed
if (HTML.nbTestXMLfiles != 1):
xml_test_file = cwd + "/test_case_list.xml"
xml_test_file = cwd + "/" + CiTestObj.testXMLfiles[0]
xmlTree = ET.parse(xml_test_file)
xmlRoot = xmlTree.getroot()
exclusion_tests=xmlRoot.findtext('TestCaseExclusionList',default='')
requested_tests=xmlRoot.findtext('TestCaseRequestedList',default='')
hardy
committed
if (HTML.nbTestXMLfiles == 1):
HTML.htmlTabRefs.append(xmlRoot.findtext('htmlTabRef',default='test-tab-0'))
HTML.htmlTabNames.append(xmlRoot.findtext('htmlTabName',default='Test-0'))
repeatCount = xmlRoot.findtext('repeatCount',default='1')
CiTestObj.repeatCounts.append(int(repeatCount))
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
all_tests=xmlRoot.findall('testCase')
exclusion_tests=exclusion_tests.split()
requested_tests=requested_tests.split()
#check that exclusion tests are well formatted
#(6 digits or less than 6 digits followed by +)
for test in exclusion_tests:
if (not re.match('^[0-9]{6}$', test) and
not re.match('^[0-9]{1,5}\+$', test)):
logging.debug('ERROR: exclusion test is invalidly formatted: ' + test)
sys.exit(1)
else:
logging.debug(test)
#check that requested tests are well formatted
#(6 digits or less than 6 digits followed by +)
#be verbose
for test in requested_tests:
if (re.match('^[0-9]{6}$', test) or
re.match('^[0-9]{1,5}\+$', test)):
logging.debug('INFO: test group/case requested: ' + test)
else:
logging.debug('ERROR: requested test is invalidly formatted: ' + test)
sys.exit(1)
hardy
committed
if (EPC.IPAddress != '') and (EPC.IPAddress != 'none'):
CiTestObj.CheckFlexranCtrlInstallation()
EPC.SetMmeIPAddress()
#get the list of tests to be done
todo_tests=[]
for test in requested_tests:
if (test_in_list(test, exclusion_tests)):
logging.debug('INFO: test will be skipped: ' + test)
else:
#logging.debug('INFO: test will be run: ' + test)
todo_tests.append(test)
signal.signal(signal.SIGUSR1, receive_signal)
Gabriele Perrone
committed
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
CiTestObj.GetAllCatMDevices(terminate_ue_flag)
else:
CiTestObj.UEDevices.append('OAI-UE')
HTML.SethtmlUEConnected(len(CiTestObj.UEDevices) + len(CiTestObj.CatMDevices))
HTML.CreateHtmlTabHeader()
CiTestObj.FailReportCnt = 0
hardy
committed
HTML.startTime=int(round(time.time() * 1000))
while CiTestObj.FailReportCnt < CiTestObj.repeatCounts[0] and RAN.prematureExit:
RAN.prematureExit=False
Gabriele Perrone
committed
# At every iteratin of the retry loop, a separator will be added
Gabriele Perrone
committed
# pass CiTestObj.FailReportCnt as parameter of HTML.CreateHtmlRetrySeparator
HTML.CreateHtmlRetrySeparator(CiTestObj.FailReportCnt)
for test_case_id in todo_tests:
for test in all_tests:
id = test.get('id')
if test_case_id != id:
continue
hardy
committed
HTML.testCase_id=CiTestObj.testCase_id
EPC.testCase_id=CiTestObj.testCase_id
CiTestObj.desc = test.findtext('desc')
hardy
committed
HTML.desc=CiTestObj.desc
action = test.findtext('class')
if (CheckClassValidity(action, id) == False):
continue
GetParametersFromXML(action)
if action == 'Initialize_UE' or action == 'Attach_UE' or action == 'Detach_UE' or action == 'Ping' or action == 'Iperf' or action == 'Reboot_UE' or action == 'DataDisable_UE' or action == 'DataEnable_UE' or action == 'CheckStatusUE':
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
if action == 'Build_eNB':
Gabriele Perrone
committed
RAN.BuildeNB()
elif action == 'WaitEndBuild_eNB':
Gabriele Perrone
committed
RAN.WaitBuildeNBisFinished()
elif action == 'Initialize_eNB':
Gabriele Perrone
committed
check_eNB = False
check_OAI_UE = False
RAN.pStatus=CiTestObj.CheckProcessExist(check_eNB, check_OAI_UE)
Gabriele Perrone
committed
RAN.InitializeeNB()
elif action == 'Terminate_eNB':
Gabriele Perrone
committed
RAN.TerminateeNB()
elif action == 'Initialize_UE':
elif action == 'Terminate_UE':
elif action == 'Attach_UE':
elif action == 'Detach_UE':
elif action == 'DataDisable_UE':
elif action == 'DataEnable_UE':
elif action == 'CheckStatusUE':
elif action == 'Build_OAI_UE':
elif action == 'Initialize_OAI_UE':
CiTestObj.InitializeOAIUE()
elif action == 'Terminate_OAI_UE':
elif action == 'Initialize_CatM_module':
elif action == 'Terminate_CatM_module':
elif action == 'Attach_CatM_module':
elif action == 'Detach_CatM_module':
elif action == 'Ping_CatM_module':
elif action == 'Ping':
elif action == 'Iperf':
elif action == 'Reboot_UE':
elif action == 'Initialize_HSS':
elif action == 'Terminate_HSS':
elif action == 'Initialize_MME':
elif action == 'Terminate_MME':
elif action == 'Initialize_SPGW':
elif action == 'Terminate_SPGW':
elif action == 'Initialize_FlexranCtrl':
CiTestObj.InitializeFlexranCtrl()
elif action == 'Terminate_FlexranCtrl':
CiTestObj.TerminateFlexranCtrl()
elif action == 'IdleSleep':
elif action == 'Perform_X2_Handover':
CiTestObj.Perform_X2_Handover()
elif action == 'Build_PhySim':
HTML=ldpc.Build_PhySim(HTML,CONST)
if ldpc.exitStatus==1:sys.exit()
elif action == 'Run_PhySim':
else:
sys.exit('Invalid action')
CiTestObj.FailReportCnt += 1
if CiTestObj.FailReportCnt == CiTestObj.repeatCounts[0] and RAN.prematureExit:
logging.debug('Testsuite failed ' + str(CiTestObj.FailReportCnt) + ' time(s)')
Gabriele Perrone
committed
HTML.CreateHtmlTabFooter(False)
sys.exit('Failed Scenario')
else:
logging.info('Testsuite passed after ' + str(CiTestObj.FailReportCnt) + ' time(s)')
Gabriele Perrone
committed
HTML.CreateHtmlTabFooter(True)
sys.exit('Invalid mode')