Newer
Older
IPAddress = self.UEIPAddress
UserName = self.UEUserName
Password = self.UEPassword
Gabriele Perrone
committed
SSH.open(IPAddress, UserName, Password)
SSH.command('lsb_release -a', '\$', 5)
result = re.search('Description:\\\\t(?P<os_type>[a-zA-Z0-9\-\_\.\ ]+)', SSH.getBefore())
if result is not None:
OsVersion = result.group('os_type')
logging.debug('OS is: ' + OsVersion)
hardy
committed
HTML.OsVersion[idx]=OsVersion
Gabriele Perrone
committed
SSH.command('hostnamectl', '\$', 5)
result = re.search('Operating System: (?P<os_type>[a-zA-Z0-9\-\_\.\ ]+)', SSH.getBefore())
OsVersion = result.group('os_type')
if OsVersion == 'CentOS Linux 7 ':
Gabriele Perrone
committed
SSH.command('cat /etc/redhat-release', '\$', 5)
result = re.search('CentOS Linux release (?P<os_version>[0-9\.]+)', SSH.getBefore())
OsVersion = OsVersion.replace('7 ', result.group('os_version'))
logging.debug('OS is: ' + OsVersion)
hardy
committed
HTML.OsVersion[idx]=OsVersion
Gabriele Perrone
committed
SSH.command('uname -r', '\$', 5)
result = re.search('uname -r\\\\r\\\\n(?P<kernel_version>[a-zA-Z0-9\-\_\.]+)', SSH.getBefore())
if result is not None:
KernelVersion = result.group('kernel_version')
logging.debug('Kernel Version is: ' + KernelVersion)
hardy
committed
HTML.KernelVersion[idx]=KernelVersion
Gabriele Perrone
committed
SSH.command('dpkg --list | egrep --color=never libuhd003', '\$', 5)
result = re.search('libuhd003:amd64 *(?P<uhd_version>[0-9\.]+)', SSH.getBefore())
if result is not None:
UhdVersion = result.group('uhd_version')
logging.debug('UHD Version is: ' + UhdVersion)
hardy
committed
HTML.UhdVersion[idx]=UhdVersion
Gabriele Perrone
committed
SSH.command('uhd_config_info --version', '\$', 5)
result = re.search('UHD (?P<uhd_version>[a-zA-Z0-9\.\-]+)', SSH.getBefore())
UhdVersion = result.group('uhd_version')
logging.debug('UHD Version is: ' + UhdVersion)
hardy
committed
HTML.UhdVersion[idx]=UhdVersion
Gabriele Perrone
committed
SSH.command('echo ' + Password + ' | sudo -S uhd_find_devices', '\$', 60)
Gabriele Perrone
committed
usrp_boards = re.findall('product: ([0-9A-Za-z]+)\\\\r\\\\n', SSH.getBefore())
count = 0
for board in usrp_boards:
if count == 0:
logging.debug('USRP Board(s) : ' + UsrpBoard)
hardy
committed
HTML.UsrpBoard[idx]=UsrpBoard
Gabriele Perrone
committed
SSH.command('lscpu', '\$', 5)
result = re.search('CPU\(s\): *(?P<nb_cpus>[0-9]+).*Model name: *(?P<model>[a-zA-Z0-9\-\_\.\ \(\)]+).*CPU MHz: *(?P<cpu_mhz>[0-9\.]+)', SSH.getBefore())
if result is not None:
CpuNb = result.group('nb_cpus')
logging.debug('nb_cpus: ' + CpuNb)
hardy
committed
HTML.CpuNb[idx]=CpuNb
CpuModel = result.group('model')
logging.debug('model: ' + CpuModel)
hardy
committed
HTML.CpuModel[idx]=CpuModel
CpuMHz = result.group('cpu_mhz') + ' MHz'
logging.debug('cpu_mhz: ' + CpuMHz)
hardy
committed
HTML.CpuMHz[idx]=CpuMHz
Gabriele Perrone
committed
SSH.close()
def ShowTestID(self):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
logging.debug('\u001B[1mTest ID:' + self.testCase_id + '\u001B[0m')
logging.debug('\u001B[1m' + self.desc + '\u001B[0m')
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
#-----------------------------------------------------------
# General Functions
#-----------------------------------------------------------
def CheckClassValidity(xml_class_list,action,id):
# if action !='COTS_UE_Airplane' and action!='Build_PhySim' and action!='Run_PhySim' and action != 'Build_eNB' and action != 'WaitEndBuild_eNB' and action != 'Initialize_eNB' and action != 'Terminate_eNB' and
# action != 'Initialize_UE' and action != 'Terminate_UE' and action != 'Attach_UE' and action != 'Detach_UE' and action != 'Build_OAI_UE' and action != 'Initialize_OAI_UE' and action != 'Terminate_OAI_UE' and
# action != 'DataDisable_UE' and action != 'DataEnable_UE' and action != 'CheckStatusUE' and action != 'Ping' and action != 'Iperf' and action != 'Reboot_UE' and action != 'Initialize_FlexranCtrl' and
# action != 'Terminate_FlexranCtrl' and action != 'Initialize_HSS' and action != 'Terminate_HSS' and action != 'Initialize_MME' and action != 'Terminate_MME' and action != 'Initialize_SPGW' and
# action != 'Terminate_SPGW' and action != 'Initialize_CatM_module' and action != 'Terminate_CatM_module' and action != 'Attach_CatM_module' and action != 'Detach_CatM_module' and
# action != 'Ping_CatM_module' and action != 'IdleSleep' and action != 'Perform_X2_Handover':
if action not in xml_class_list:
logging.debug('ERROR: test-case ' + id + ' has unlisted class ' + action + ' ##CHECK xml_class_list.yml')
resp=False
else:
resp=True
return resp
#assigning parameters to object instance attributes (even if the attributes do not exist !!)
def AssignParams(params_dict):
for key,value in params_dict.items():
setattr(CiTestObj, key, value)
setattr(RAN, key, value)
setattr(HTML, key, value)
setattr(ldpc, key, value)
def GetParametersFromXML(action):
if action == 'Build_eNB':
RAN.Build_eNB_args=test.findtext('Build_eNB_args')
forced_workspace_cleanup = test.findtext('forced_workspace_cleanup')
if (forced_workspace_cleanup is None):
RAN.Build_eNB_forced_workspace_cleanup=False
else:
if re.match('true', forced_workspace_cleanup, re.IGNORECASE):
RAN.Build_eNB_forced_workspace_cleanup=True
Gabriele Perrone
committed
else:
RAN.Build_eNB_forced_workspace_cleanup=False
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
xmlBgBuildField = test.findtext('backgroundBuild')
if (xmlBgBuildField is None):
else:
if re.match('true', xmlBgBuildField, re.IGNORECASE):
if action == 'WaitEndBuild_eNB':
RAN.Build_eNB_args=test.findtext('Build_eNB_args')
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
if action == 'Initialize_eNB':
RAN.Initialize_eNB_args=test.findtext('Initialize_eNB_args')
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = 'ocp-enb'
RAN.air_interface(CiTestObj.air_interface)
RAN.eNB_instance=test.findtext('eNB_instance')
if (RAN.eNB_instance is None):
RAN.eNB_instance='0'
RAN.eNB_serverId=test.findtext('eNB_serverId')
if (RAN.eNB_serverId is None):
RAN.eNB_serverId='0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = 'ocp-enb'
if action == 'Attach_UE':
nbMaxUEtoAttach = test.findtext('nbMaxUEtoAttach')
if (nbMaxUEtoAttach is None):
CiTestObj.nbMaxUEtoAttach = -1
CiTestObj.nbMaxUEtoAttach = int(nbMaxUEtoAttach)
if action == 'CheckStatusUE':
expectedNBUE = test.findtext('expectedNbOfConnectedUEs')
if (expectedNBUE is None):
CiTestObj.expectedNbOfConnectedUEs = -1
CiTestObj.expectedNbOfConnectedUEs = int(expectedNBUE)
CiTestObj.Build_OAI_UE_args = test.findtext('Build_OAI_UE_args')
Gabriele Perrone
committed
CiTestObj.clean_repository = test.findtext('clean_repository')
if (CiTestObj.clean_repository == 'false'):
CiTestObj.clean_repository = False
Gabriele Perrone
committed
CiTestObj.clean_repository = True
CiTestObj.Initialize_OAI_UE_args = test.findtext('Initialize_OAI_UE_args')
CiTestObj.UE_instance = test.findtext('UE_instance')
if (CiTestObj.UE_instance is None):
CiTestObj.UE_instance = '0'
#local variable air_interface
air_interface = test.findtext('air_interface')
if (air_interface is None) or (air_interface.lower() not in ['nr','lte','ocp']):
CiTestObj.air_interface = 'lte-softmodem'
elif (air_interface.lower() in ['nr','lte']):
CiTestObj.air_interface = air_interface.lower() +'-softmodem'
else :
CiTestObj.air_interface = 'ocp-enb'
RAN.eNB_instance=test.findtext('UE_instance')
if (CiTestObj.UE_instance is None):
CiTestObj.UE_instance = '0'
if action == 'Ping' or action == 'Ping_CatM_module':
CiTestObj.ping_args = test.findtext('ping_args')
CiTestObj.ping_packetloss_threshold = test.findtext('ping_packetloss_threshold')
if action == 'Iperf':
CiTestObj.iperf_args = test.findtext('iperf_args')
CiTestObj.iperf_packetloss_threshold = test.findtext('iperf_packetloss_threshold')
CiTestObj.iperf_profile = test.findtext('iperf_profile')
if (CiTestObj.iperf_profile is None):
CiTestObj.iperf_profile = 'balanced'
if CiTestObj.iperf_profile != 'balanced' and CiTestObj.iperf_profile != 'unbalanced' and CiTestObj.iperf_profile != 'single-ue':
logging.debug('ERROR: test-case has wrong profile ' + CiTestObj.iperf_profile)
CiTestObj.iperf_profile = 'balanced'
Gabriele Perrone
committed
CiTestObj.iperf_options = test.findtext('iperf_options')
if (CiTestObj.iperf_options is None):
CiTestObj.iperf_options = 'check'
Gabriele Perrone
committed
if CiTestObj.iperf_options != 'check' and CiTestObj.iperf_options != 'sink':
logging.debug('ERROR: test-case has wrong option ' + CiTestObj.iperf_options)
CiTestObj.iperf_options = 'check'
if action == 'IdleSleep':
string_field = test.findtext('idle_sleep_time_in_sec')
if (string_field is None):
CiTestObj.idle_sleep_time = 5
CiTestObj.idle_sleep_time = int(string_field)
if action == 'Perform_X2_Handover':
string_field = test.findtext('x2_ho_options')
if (string_field is None):
CiTestObj.x2_ho_options = 'network'
else:
if string_field != 'network':
logging.error('ERROR: test-case has wrong option ' + string_field)
CiTestObj.x2_ho_options = 'network'
CiTestObj.x2_ho_options = string_field
if action == 'Build_PhySim':
ldpc.buildargs = test.findtext('physim_build_args')
forced_workspace_cleanup = test.findtext('forced_workspace_cleanup')
if (forced_workspace_cleanup is None):
ldpc.forced_workspace_cleanup=False
else:
if re.match('true', forced_workspace_cleanup, re.IGNORECASE):
ldpc.forced_workspace_cleanup=True
else:
ldpc.forced_workspace_cleanup=False
if action == 'Run_PhySim':
ldpc.runargs = test.findtext('physim_run_args')
if action == 'COTS_UE_Airplane':
COTS_UE.runargs = test.findtext('cots_ue_airplane_args')
#check if given test is in list
#it is in list if one of the strings in 'list' is at the beginning of 'test'
def test_in_list(test, list):
for check in list:
check=check.replace('+','')
if (test.startswith(check)):
return True
return False
def receive_signal(signum, frame):
sys.exit(1)
#-----------------------------------------------------------
# MAIN PART
#-----------------------------------------------------------
#loading xml action list from yaml
import yaml
with open('xml_class_list.yml','r') as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
xml_class_list = yaml.load(file,Loader=yaml.FullLoader)
CiTestObj = OaiCiTest()
Gabriele Perrone
committed
SSH = sshconnection.SSHConnection()
EPC = epc.EPCManagement()
RAN = ran.RANManagement()
HTML = html.HTMLManagement()
EPC.htmlObj=HTML
RAN.htmlObj=HTML
RAN.epcObj=EPC
ldpc=cls_physim.PhySim() #create an instance for LDPC test using GPU or CPU build
#-----------------------------------------------------------
# Parsing Command Line Arguments
#-----------------------------------------------------------
import args_parse
py_param_file_present, py_params, mode = args_parse.ArgsParse(sys.argv,CiTestObj,RAN,HTML,EPC,ldpc,HELP)
Gabriele Perrone
committed
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
#argvs = sys.argv
#py_param_file_present = False
#
#while len(argvs) > 1:
# myArgv = argvs.pop(1) # 0th is this file's name
#
# #--help
# if re.match('^\-\-help$', myArgv, re.IGNORECASE):
# HELP.GenericHelp(CONST.Version)
# sys.exit(0)
#
# #--apply=<filename> as parameters file, to replace inline parameters
# elif re.match('^\-\-Apply=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-Apply=(.+)$', myArgv, re.IGNORECASE)
# py_params_file = matchReg.group(1)
# with open(py_params_file,'r') as file:
# # The FullLoader parameter handles the conversion from YAML
# # scalar values to Python dictionary format
# py_params = yaml.load(file,Loader=yaml.FullLoader)
# py_param_file_present = True #to be removed once validated
# #AssignParams(py_params) #to be uncommented once validated
#
# #consider inline parameters
# elif re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-mode=(.+)$', myArgv, re.IGNORECASE)
# mode = matchReg.group(1)
# elif re.match('^\-\-eNBRepository=(.+)$|^\-\-ranRepository(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBRepository=(.+)$', myArgv, re.IGNORECASE)
# else:
# matchReg = re.match('^\-\-ranRepository=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.ranRepository = matchReg.group(1)
# RAN.ranRepository=matchReg.group(1)
# HTML.ranRepository=matchReg.group(1)
# ldpc.ranRepository=matchReg.group(1)
# elif re.match('^\-\-eNB_AllowMerge=(.+)$|^\-\-ranAllowMerge=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB_AllowMerge=(.+)$', myArgv, re.IGNORECASE)
# else:
# matchReg = re.match('^\-\-ranAllowMerge=(.+)$', myArgv, re.IGNORECASE)
# doMerge = matchReg.group(1)
# ldpc.ranAllowMerge=matchReg.group(1)
# if ((doMerge == 'true') or (doMerge == 'True')):
# CiTestObj.ranAllowMerge = True
# RAN.ranAllowMerge=True
# HTML.ranAllowMerge=True
# elif re.match('^\-\-eNBBranch=(.+)$|^\-\-ranBranch=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBBranch=(.+)$', myArgv, re.IGNORECASE)
# else:
# matchReg = re.match('^\-\-ranBranch=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.ranBranch = matchReg.group(1)
# RAN.ranBranch=matchReg.group(1)
# HTML.ranBranch=matchReg.group(1)
# ldpc.ranBranch=matchReg.group(1)
# elif re.match('^\-\-eNBCommitID=(.*)$|^\-\-ranCommitID=(.*)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBCommitID=(.*)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBCommitID=(.*)$', myArgv, re.IGNORECASE)
# else:
# matchReg = re.match('^\-\-ranCommitID=(.*)$', myArgv, re.IGNORECASE)
# CiTestObj.ranCommitID = matchReg.group(1)
# RAN.ranCommitID=matchReg.group(1)
# HTML.ranCommitID=matchReg.group(1)
# ldpc.ranCommitID=matchReg.group(1)
# elif re.match('^\-\-eNBTargetBranch=(.*)$|^\-\-ranTargetBranch=(.*)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBTargetBranch=(.*)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBTargetBranch=(.*)$', myArgv, re.IGNORECASE)
# else:
# matchReg = re.match('^\-\-ranTargetBranch=(.*)$', myArgv, re.IGNORECASE)
# CiTestObj.ranTargetBranch = matchReg.group(1)
# RAN.ranTargetBranch=matchReg.group(1)
# HTML.ranTargetBranch=matchReg.group(1)
# ldpc.ranTargetBranch=matchReg.group(1)
# elif re.match('^\-\-eNBIPAddress=(.+)$|^\-\-eNB[1-2]IPAddress=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBIPAddress=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNBIPAddress=matchReg.group(1)
# ldpc.eNBIpAddr=matchReg.group(1)
# elif re.match('^\-\-eNB1IPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB1IPAddress=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB1IPAddress=matchReg.group(1)
# elif re.match('^\-\-eNB2IPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB2IPAddress=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB2IPAddress=matchReg.group(1)
# elif re.match('^\-\-eNBUserName=(.+)$|^\-\-eNB[1-2]UserName=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBUserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBUserName=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNBUserName=matchReg.group(1)
# ldpc.eNBUserName=matchReg.group(1)
# elif re.match('^\-\-eNB1UserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB1UserName=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB1UserName=matchReg.group(1)
# elif re.match('^\-\-eNB2UserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB2UserName=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB2UserName=matchReg.group(1)
# elif re.match('^\-\-eNBPassword=(.+)$|^\-\-eNB[1-2]Password=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBPassword=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBPassword=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNBPassword=matchReg.group(1)
# ldpc.eNBPassWord=matchReg.group(1)
# elif re.match('^\-\-eNB1Password=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB1Password=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB1Password=matchReg.group(1)
# elif re.match('^\-\-eNB2Password=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB2Password=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB2Password=matchReg.group(1)
# elif re.match('^\-\-eNBSourceCodePath=(.+)$|^\-\-eNB[1-2]SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# if re.match('^\-\-eNBSourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNBSourceCodePath=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNBSourceCodePath=matchReg.group(1)
# ldpc.eNBSourceCodePath=matchReg.group(1)
# elif re.match('^\-\-eNB1SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB1SourceCodePath=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB1SourceCodePath=matchReg.group(1)
# elif re.match('^\-\-eNB2SourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-eNB2SourceCodePath=(.+)$', myArgv, re.IGNORECASE)
# RAN.eNB2SourceCodePath=matchReg.group(1)
# elif re.match('^\-\-EPCIPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCIPAddress=(.+)$', myArgv, re.IGNORECASE)
# EPC.IPAddress=matchReg.group(1)
# elif re.match('^\-\-EPCUserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCUserName=(.+)$', myArgv, re.IGNORECASE)
# EPC.UserName=matchReg.group(1)
# elif re.match('^\-\-EPCPassword=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCPassword=(.+)$', myArgv, re.IGNORECASE)
# EPC.Password=matchReg.group(1)
# elif re.match('^\-\-EPCSourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCSourceCodePath=(.+)$', myArgv, re.IGNORECASE)
# EPC.SourceCodePath=matchReg.group(1)
# elif re.match('^\-\-EPCType=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCType=(.+)$', myArgv, re.IGNORECASE)
# if re.match('OAI', matchReg.group(1), re.IGNORECASE) or re.match('ltebox', matchReg.group(1), re.IGNORECASE) or re.match('OAI-Rel14-CUPS', matchReg.group(1), re.IGNORECASE) or re.match('OAI-Rel14-Docker', matchReg.group(1), re.IGNORECASE):
# EPC.Type=matchReg.group(1)
# else:
# sys.exit('Invalid EPC Type: ' + matchReg.group(1) + ' -- (should be OAI or ltebox or OAI-Rel14-CUPS or OAI-Rel14-Docker)')
# elif re.match('^\-\-EPCContainerPrefix=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-EPCContainerPrefix=(.+)$', myArgv, re.IGNORECASE)
# EPC.ContainerPrefix=matchReg.group(1)
# elif re.match('^\-\-ADBIPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-ADBIPAddress=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.ADBIPAddress = matchReg.group(1)
# elif re.match('^\-\-ADBUserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-ADBUserName=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.ADBUserName = matchReg.group(1)
# elif re.match('^\-\-ADBType=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-ADBType=(.+)$', myArgv, re.IGNORECASE)
# if re.match('centralized', matchReg.group(1), re.IGNORECASE) or re.match('distributed', matchReg.group(1), re.IGNORECASE):
# if re.match('distributed', matchReg.group(1), re.IGNORECASE):
# CiTestObj.ADBCentralized = False
# else:
# CiTestObj.ADBCentralized = True
# else:
# sys.exit('Invalid ADB Type: ' + matchReg.group(1) + ' -- (should be centralized or distributed)')
# elif re.match('^\-\-ADBPassword=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-ADBPassword=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.ADBPassword = matchReg.group(1)
# elif re.match('^\-\-XMLTestFile=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-XMLTestFile=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.testXMLfiles.append(matchReg.group(1))
# HTML.testXMLfiles=matchReg.group(1)
# HTML.nbTestXMLfiles=HTML.nbTestXMLfiles+1
# elif re.match('^\-\-UEIPAddress=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-UEIPAddress=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.UEIPAddress = matchReg.group(1)
# elif re.match('^\-\-UEUserName=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-UEUserName=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.UEUserName = matchReg.group(1)
# elif re.match('^\-\-UEPassword=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-UEPassword=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.UEPassword = matchReg.group(1)
# elif re.match('^\-\-UESourceCodePath=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-UESourceCodePath=(.+)$', myArgv, re.IGNORECASE)
# CiTestObj.UESourceCodePath = matchReg.group(1)
# elif re.match('^\-\-finalStatus=(.+)$', myArgv, re.IGNORECASE):
# matchReg = re.match('^\-\-finalStatus=(.+)$', myArgv, re.IGNORECASE)
# finalStatus = matchReg.group(1)
# if ((finalStatus == 'true') or (finalStatus == 'True')):
# CiTestObj.finalStatus = True
# else:
# HELP.GenericHelp(CONST.Version)
# sys.exit('Invalid Parameter: ' + myArgv)
#-----------------------------------------------------------
# TEMPORARY params management
#-----------------------------------------------------------
#temporary solution for testing:
if py_param_file_present == True:
AssignParams(py_params)
#for debug
print(RAN.__dict__)
print(CiTestObj.__dict__)
print(HTML.__dict__)
print(ldpc.__dict__)
#for debug
#-----------------------------------------------------------
# COTS UE instanciation
#-----------------------------------------------------------
#COTS_UE instanciation can only be done here for the moment, due to code architecture
COTS_UE=cls_cots_ue.CotsUe('oppo', CiTestObj.UEIPAddress, CiTestObj.UEUserName,CiTestObj.UEPassword)
#-----------------------------------------------------------
# XML class (action) analysis
#-----------------------------------------------------------
cwd = os.getcwd()
if re.match('^TerminateeNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '':
sys.exit('Insufficient Parameter')
RAN.eNB_serverId='0'
RAN.eNB_instance='0'
RAN.eNBSourceCodePath='/tmp/'
elif re.match('^TerminateUE$', mode, re.IGNORECASE):
if (CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == ''):
sys.exit('Insufficient Parameter')
signal.signal(signal.SIGUSR1, receive_signal)
elif re.match('^TerminateOAIUE$', mode, re.IGNORECASE):
if CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '':
sys.exit('Insufficient Parameter')
signal.signal(signal.SIGUSR1, receive_signal)
elif re.match('^TerminateHSS$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^TerminateMME$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^TerminateSPGW$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath== '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectBuild$', mode, re.IGNORECASE):
if (RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '') and (CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == ''):
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectBuild()
elif re.match('^LogCollecteNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectHSS$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectMME$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectSPGW$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectPing$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
elif re.match('^LogCollectIperf$', mode, re.IGNORECASE):
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '':
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectIperf()
elif re.match('^LogCollectOAIUE$', mode, re.IGNORECASE):
if CiTestObj.UEIPAddress == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == '':
sys.exit('Insufficient Parameter')
CiTestObj.LogCollectOAIUE()
elif re.match('^InitiateHtml$', mode, re.IGNORECASE):
if (CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == ''):
sys.exit('Insufficient Parameter')
count = 0
while (count < HTML.nbTestXMLfiles):
Gabriele Perrone
committed
#xml_test_file = cwd + "/" + CiTestObj.testXMLfiles[count]
xml_test_file = sys.path[0] + "/" + CiTestObj.testXMLfiles[count]
if (os.path.isfile(xml_test_file)):
try:
xmlTree = ET.parse(xml_test_file)
except:
print("Error while parsing file: " + xml_test_file)
xmlRoot = xmlTree.getroot()
hardy
committed
HTML.htmlTabRefs.append(xmlRoot.findtext('htmlTabRef',default='test-tab-' + str(count)))
HTML.htmlTabNames.append(xmlRoot.findtext('htmlTabName',default='test-tab-' + str(count)))
HTML.htmlTabIcons.append(xmlRoot.findtext('htmlTabIcon',default='info-sign'))
hardy
committed
if foundCount != HTML.nbTestXMLfiles:
HTML.nbTestXMLfiles=foundCount
Gabriele Perrone
committed
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
CiTestObj.GetAllCatMDevices(terminate_ue_flag)
HTML.SethtmlUEConnected(len(CiTestObj.UEDevices) + len(CiTestObj.CatMDevices))
hardy
committed
HTML.htmlNb_Smartphones=len(CiTestObj.UEDevices)
HTML.htmlNb_CATM_Modules=len(CiTestObj.CatMDevices)
Gabriele Perrone
committed
HTML.CreateHtmlHeader(CiTestObj.ADBIPAddress)
elif re.match('^FinalizeHtml$', mode, re.IGNORECASE):
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
logging.debug('\u001B[1m Creating HTML footer \u001B[0m')
logging.debug('\u001B[1m----------------------------------------\u001B[0m')
CiTestObj.RetrieveSystemVersion('eNB')
CiTestObj.RetrieveSystemVersion('UE')
Gabriele Perrone
committed
HTML.CreateHtmlFooter(CiTestObj.finalStatus)

Boris Djalal
committed
elif re.match('^TesteNB$', mode, re.IGNORECASE) or re.match('^TestUE$', mode, re.IGNORECASE):
if re.match('^TesteNB$', mode, re.IGNORECASE):
if RAN.eNBIPAddress == '' or RAN.ranRepository == '' or RAN.ranBranch == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '' or EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.Type == '' or EPC.SourceCodePath == '' or CiTestObj.ADBIPAddress == '' or CiTestObj.ADBUserName == '' or CiTestObj.ADBPassword == '':
hardy
committed
if EPC.IPAddress == '' or EPC.UserName == '' or EPC.Password == '' or EPC.SourceCodePath == '' or EPC.Type == '':
HELP.EPCSrvHelp(EPC.IPAddress, EPC.UserName, EPC.Password, EPC.SourceCodePath, EPC.Type)
if RAN.ranRepository == '':
HELP.GitSrvHelp(RAN.ranRepository, RAN.ranBranch, RAN.ranCommitID, RAN.ranAllowMerge, RAN.ranTargetBranch)
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or RAN.eNBSourceCodePath == '':
HELP.eNBSrvHelp(RAN.eNBIPAddress, RAN.eNBUserName, RAN.eNBPassword, RAN.eNBSourceCodePath)

Boris Djalal
committed
sys.exit('Insufficient Parameter')
hardy
committed
if (EPC.IPAddress!= '') and (EPC.IPAddress != 'none'):
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, cwd + "/tcp_iperf_stats.awk", "/tmp")
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, cwd + "/active_net_interfaces.awk", "/tmp")

Boris Djalal
committed
else:
if CiTestObj.UEIPAddress == '' or CiTestObj.ranRepository == '' or CiTestObj.ranBranch == '' or CiTestObj.UEUserName == '' or CiTestObj.UEPassword == '' or CiTestObj.UESourceCodePath == '':
HELP.GenericHelp(CONST.Version)

Boris Djalal
committed
sys.exit('UE: Insufficient Parameter')
#read test_case_list.xml file
# if no parameters for XML file, use default value
hardy
committed
if (HTML.nbTestXMLfiles != 1):
xml_test_file = cwd + "/test_case_list.xml"
xml_test_file = cwd + "/" + CiTestObj.testXMLfiles[0]
xmlTree = ET.parse(xml_test_file)
xmlRoot = xmlTree.getroot()
exclusion_tests=xmlRoot.findtext('TestCaseExclusionList',default='')
requested_tests=xmlRoot.findtext('TestCaseRequestedList',default='')
hardy
committed
if (HTML.nbTestXMLfiles == 1):
HTML.htmlTabRefs.append(xmlRoot.findtext('htmlTabRef',default='test-tab-0'))
HTML.htmlTabNames.append(xmlRoot.findtext('htmlTabName',default='Test-0'))
repeatCount = xmlRoot.findtext('repeatCount',default='1')
CiTestObj.repeatCounts.append(int(repeatCount))
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
all_tests=xmlRoot.findall('testCase')
exclusion_tests=exclusion_tests.split()
requested_tests=requested_tests.split()
#check that exclusion tests are well formatted
#(6 digits or less than 6 digits followed by +)
for test in exclusion_tests:
if (not re.match('^[0-9]{6}$', test) and
not re.match('^[0-9]{1,5}\+$', test)):
logging.debug('ERROR: exclusion test is invalidly formatted: ' + test)
sys.exit(1)
else:
logging.debug(test)
#check that requested tests are well formatted
#(6 digits or less than 6 digits followed by +)
#be verbose
for test in requested_tests:
if (re.match('^[0-9]{6}$', test) or
re.match('^[0-9]{1,5}\+$', test)):
logging.debug('INFO: test group/case requested: ' + test)
else:
logging.debug('ERROR: requested test is invalidly formatted: ' + test)
sys.exit(1)
hardy
committed
if (EPC.IPAddress != '') and (EPC.IPAddress != 'none'):
CiTestObj.CheckFlexranCtrlInstallation()
EPC.SetMmeIPAddress()
#get the list of tests to be done
todo_tests=[]
for test in requested_tests:
if (test_in_list(test, exclusion_tests)):
logging.debug('INFO: test will be skipped: ' + test)
else:
#logging.debug('INFO: test will be run: ' + test)
todo_tests.append(test)
signal.signal(signal.SIGUSR1, receive_signal)
Gabriele Perrone
committed
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
CiTestObj.GetAllCatMDevices(terminate_ue_flag)
else:
CiTestObj.UEDevices.append('OAI-UE')
HTML.SethtmlUEConnected(len(CiTestObj.UEDevices) + len(CiTestObj.CatMDevices))
HTML.CreateHtmlTabHeader()
CiTestObj.FailReportCnt = 0
hardy
committed
HTML.startTime=int(round(time.time() * 1000))
while CiTestObj.FailReportCnt < CiTestObj.repeatCounts[0] and RAN.prematureExit:
RAN.prematureExit=False
Gabriele Perrone
committed
# At every iteratin of the retry loop, a separator will be added
Gabriele Perrone
committed
# pass CiTestObj.FailReportCnt as parameter of HTML.CreateHtmlRetrySeparator
HTML.CreateHtmlRetrySeparator(CiTestObj.FailReportCnt)
for test_case_id in todo_tests:
for test in all_tests:
id = test.get('id')
if test_case_id != id:
continue
hardy
committed
HTML.testCase_id=CiTestObj.testCase_id
EPC.testCase_id=CiTestObj.testCase_id
CiTestObj.desc = test.findtext('desc')
hardy
committed
HTML.desc=CiTestObj.desc
action = test.findtext('class')
if (CheckClassValidity(xml_class_list, action, id) == False):
GetParametersFromXML(action)
if action == 'Initialize_UE' or action == 'Attach_UE' or action == 'Detach_UE' or action == 'Ping' or action == 'Iperf' or action == 'Reboot_UE' or action == 'DataDisable_UE' or action == 'DataEnable_UE' or action == 'CheckStatusUE':
if (CiTestObj.ADBIPAddress != 'none'):
terminate_ue_flag = False
CiTestObj.GetAllUEDevices(terminate_ue_flag)
if action == 'Build_eNB':
Gabriele Perrone
committed
RAN.BuildeNB()
elif action == 'WaitEndBuild_eNB':
Gabriele Perrone
committed
RAN.WaitBuildeNBisFinished()
elif action == 'Initialize_eNB':
Gabriele Perrone
committed
check_eNB = False
check_OAI_UE = False
RAN.pStatus=CiTestObj.CheckProcessExist(check_eNB, check_OAI_UE)
Gabriele Perrone
committed
RAN.InitializeeNB()
elif action == 'Terminate_eNB':
Gabriele Perrone
committed
RAN.TerminateeNB()
elif action == 'Initialize_UE':
elif action == 'Terminate_UE':
elif action == 'Attach_UE':
elif action == 'Detach_UE':
elif action == 'DataDisable_UE':
elif action == 'DataEnable_UE':
elif action == 'CheckStatusUE':
elif action == 'Build_OAI_UE':
elif action == 'Initialize_OAI_UE':
CiTestObj.InitializeOAIUE()
elif action == 'Terminate_OAI_UE':
elif action == 'Initialize_CatM_module':
elif action == 'Terminate_CatM_module':
elif action == 'Attach_CatM_module':
elif action == 'Detach_CatM_module':
elif action == 'Ping_CatM_module':
elif action == 'Ping':
elif action == 'Iperf':
elif action == 'Reboot_UE':
elif action == 'Initialize_HSS':
elif action == 'Terminate_HSS':
elif action == 'Initialize_MME':
elif action == 'Terminate_MME':
elif action == 'Initialize_SPGW':
elif action == 'Terminate_SPGW':
elif action == 'Initialize_FlexranCtrl':
CiTestObj.InitializeFlexranCtrl()
elif action == 'Terminate_FlexranCtrl':
CiTestObj.TerminateFlexranCtrl()
elif action == 'IdleSleep':
elif action == 'Perform_X2_Handover':
CiTestObj.Perform_X2_Handover()
elif action == 'Build_PhySim':
HTML=ldpc.Build_PhySim(HTML,CONST)
if ldpc.exitStatus==1:sys.exit()
elif action == 'Run_PhySim':
elif action == 'COTS_UE_Airplane':
COTS_UE.Set_Airplane(COTS_UE.runargs)
sys.exit('Invalid class (action) from xml')
CiTestObj.FailReportCnt += 1
if CiTestObj.FailReportCnt == CiTestObj.repeatCounts[0] and RAN.prematureExit:
logging.debug('Testsuite failed ' + str(CiTestObj.FailReportCnt) + ' time(s)')
Gabriele Perrone
committed
HTML.CreateHtmlTabFooter(False)
sys.exit('Failed Scenario')
else:
logging.info('Testsuite passed after ' + str(CiTestObj.FailReportCnt) + ' time(s)')
Gabriele Perrone
committed
HTML.CreateHtmlTabFooter(True)
elif re.match('^LoadParams$', mode, re.IGNORECASE):
pass
sys.exit('Invalid mode')