Commit 0e899f5e authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Update test scripts with more options

parent ce2c5f8d
......@@ -6,6 +6,7 @@ Keep the Zoe scheduler loaded with BOINC clients.
Usage: boinc_load.py <zapp_json_file>
"""
import csv
import json
import os
import sys
......@@ -16,8 +17,11 @@ from zoe_lib.statistics import ZoeStatisticsAPI
from zoe_lib.executions import ZoeExecutionsAPI
TARGET_QUEUE_LENGTH = 10
TOTAL_JOBS = 1000
MAX_TO_START_PER_LOOP = 30
NUMBER_OF_HOSTS = 10
ZAPP_PER_HOST = 4
TOTAL_JOBS = NUMBER_OF_HOSTS * ZAPP_PER_HOST
# TOTAL_JOBS = 1000
MAX_TO_START_PER_LOOP = 10
def zoe_url():
......@@ -39,7 +43,7 @@ def check_queue_length():
"""Checks how many zapps are in the scheduler queue."""
stats_api = ZoeStatisticsAPI(zoe_url(), zoe_user(), zoe_pass())
sched = stats_api.scheduler()
print('Scheduler queue length: {}'.format(sched['queue_length']))
# print('Scheduler queue length: {}'.format(sched['queue_length']))
return sched['queue_length']
......@@ -52,7 +56,7 @@ def submit_zapp(zapp):
"""Submits one ZApp for execution."""
exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
ret = exec_api.start('boinc-loader', zapp)
print("Application scheduled successfully with ID {}".format(ret))
return ret
def count_jobs():
......@@ -62,30 +66,85 @@ def count_jobs():
count = 0
for e_id in execs:
e = exec_api.get(e_id)
if e['name'] != 'boinc-loader' or e['status'] != 'terminated':
if e['name'] != 'boinc-loader':
continue
else:
if e['status'] != 'terminated':
count += 1
return count
def main():
"""Main."""
def delete_finished():
"""Delete finished executions from Zoe."""
exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
execs = exec_api.list()
for e_id in execs:
e = exec_api.get(e_id)
if e['name'] == 'boinc-loader' and e['status'] == 'terminated':
print('Execution {} has finished, deleting...')
exec_api.delete(e['id'])
def start_batches(zapp, log):
zapps_to_start = TOTAL_JOBS - count_jobs()
print('I need to start {} zapps'.format(zapps_to_start))
zapp = load_zapp(sys.argv[1])
while zapps_to_start > 0:
queue_length = check_queue_length()
if queue_length >= TARGET_QUEUE_LENGTH:
to_sleep = random.randint(10, 300)
to_sleep = random.randint(1, 60)
print("Target scheduler queue length reached, sleeping for {} seconds".format(to_sleep))
time.sleep(to_sleep)
continue
to_start_now = random.randint(1, MAX_TO_START_PER_LOOP)
print('Will submit {} new zapps'.format(to_start_now))
for i in range(to_start_now):
submit_zapp(zapp)
zapp_id = submit_zapp(zapp)
zapps_to_start -= 1
print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
log.writerow({'zapp_id': zapp_id, 'queue_length': queue_length})
def start_continuous(zapp, log):
zapps_to_start = TOTAL_JOBS - count_jobs()
print('I need to start {} zapps'.format(zapps_to_start))
while zapps_to_start > 0:
zapp_id = submit_zapp(zapp)
zapps_to_start -= 1
time_to_sleep = random.uniform(0, 1)
queue_length = check_queue_length()
print("ZApp submitted with ID {}, queue length {}, will sleep for {}, {} zapps to go".format(zapp_id, queue_length, time_to_sleep, zapps_to_start))
log.writerow({'zapp_id': zapp_id, 'queue_length': queue_length})
time.sleep(time_to_sleep)
def keep_some_running(zapp):
while True:
zapps_to_start = TOTAL_JOBS - count_jobs()
print('I need to start {} zapps'.format(zapps_to_start))
if zapps_to_start > 0:
for i in range(zapps_to_start):
queue_length = check_queue_length()
# zapp_id = submit_zapp(zapp)
zapp_id = 'fake'
print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
try:
print('Sleeping')
time.sleep(60)
except KeyboardInterrupt:
print('Exiting infinite loop')
break
delete_finished()
def main():
"""Main."""
log_fieldnames = ['zapp_id', 'queue_length']
log = csv.DictWriter(open('run.csv', 'w'), fieldnames=log_fieldnames)
log.writeheader()
zapp = load_zapp(sys.argv[1])
# start_batches(zapp, log)
# start_continuous(zapp, log)
keep_some_running(zapp)
print('All Zapps submitted, my work is done.')
if __name__ == "__main__":
......
#!/usr/bin/env python3
"""
Retrieve a workload trace from boinc jobs.
Usage: boinc_trace.py <out_file>
"""
import os
import requests
from zoe_lib.statistics import ZoeStatisticsAPI
from zoe_lib.executions import ZoeExecutionsAPI
TARGET_QUEUE_LENGTH = 5
def zoe_url():
"""Gets the API URL."""
return os.environ['ZOE_URL']
def zoe_user():
"""Gets the API user name."""
return os.environ['ZOE_USER']
def zoe_pass():
"""Gets the API password."""
return os.environ['ZOE_PASS']
def check_queue_length():
"""Checks how many zapps are in the scheduler queue."""
stats_api = ZoeStatisticsAPI(zoe_url(), zoe_user(), zoe_pass())
sched = stats_api.scheduler()
print('Scheduler queue length: {}'.format(sched['queue_length']))
return sched['queue_length']
def submit_zapp(zapp):
"""Submits one ZApp for execution."""
exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
ret = exec_api.start('boinc-loader', zapp)
print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(ret))
def get_influx_cpu_data(exec_id):
query_params = {
'db': 'telegraf',
'q': 'SELECT LAST(usage_total) FROM docker_container_cpu WHERE "zoe.execution.id" = \'{}\' AND cpu = \'cpu-total\''.format(exec_id)
}
resp = requests.get("http://192.168.45.2:8086/query", params=query_params)
resp = resp.json()
try:
cpu_usage = resp['results'][0]['series'][0]['values'][0][1]
except KeyError:
cpu_usage = 0
return cpu_usage
def get_influx_mem_data(exec_id):
query_params = {
'db': 'telegraf',
'q': 'SELECT MAX(max_usage) FROM docker_container_mem WHERE "zoe.execution.id" = \'{}\''.format(exec_id)
}
resp = requests.get("http://192.168.45.2:8086/query", params=query_params)
resp = resp.json()
try:
mem_usage = resp['results'][0]['series'][0]['values'][0][1]
except KeyError:
mem_usage = 0
return mem_usage
def get_influx_net_rx_data(exec_id):
query_params = {
'db': 'telegraf',
'q': 'SELECT LAST(rx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id)
}
resp = requests.get("http://192.168.45.2:8086/query", params=query_params)
resp = resp.json()
try:
net_rx_usage = resp['results'][0]['series'][0]['values'][0][1]
except KeyError:
net_rx_usage = 0
return net_rx_usage
def get_influx_net_tx_data(exec_id):
query_params = {
'db': 'telegraf',
'q': 'SELECT LAST(tx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id)
}
resp = requests.get("http://192.168.45.2:8086/query", params=query_params)
resp = resp.json()
try:
net_tx_usage = resp['results'][0]['series'][0]['values'][0][1]
except KeyError:
net_tx_usage = 0
return net_tx_usage
def get_influx_blkio_data(exec_id):
query_params = {
'db': 'telegraf',
'q': 'SELECT LAST(io_serviced_recursive_total) FROM docker_container_blkio WHERE "zoe.execution.id" = \'{}\''.format(exec_id)
}
resp = requests.get("http://192.168.45.2:8086/query", params=query_params)
resp = resp.json()
try:
blkio_usage = resp['results'][0]['series'][0]['values'][0][1]
except KeyError:
blkio_usage = 0
return blkio_usage
def main():
"""Main."""
exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
execs = exec_api.list()
print('id,time_submit,time_start,time_end,cpu_usage,mem_usage,net_rx_usage,net_tx_usage,blkio_usage')
for e_id in execs:
e = exec_api.get(e_id)
if e['name'] != 'boinc-loader' or e['status'] != 'terminated':
continue
trace_line = {
'id': e['id'],
'time_submit': e['time_submit'],
'time_start': e['time_start'],
'time_end': e['time_end'],
'cpu_usage': get_influx_cpu_data(e_id),
'mem_usage': get_influx_mem_data(e_id),
'net_rx_usage': get_influx_net_rx_data(e_id),
'net_tx_usage': get_influx_net_tx_data(e_id),
'blkio_usage': get_influx_blkio_data(e_id)
}
print('{id},{time_submit},{time_start},{time_end},{cpu_usage},{mem_usage},{net_rx_usage},{net_tx_usage},{blkio_usage}'.format(**trace_line))
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment