Commit 274c94e4 authored by Daniele Venzano's avatar Daniele Venzano

Fix small issues found by SonarQube

parent bc709616
...@@ -20,7 +20,6 @@ TARGET_QUEUE_LENGTH = 10 ...@@ -20,7 +20,6 @@ TARGET_QUEUE_LENGTH = 10
NUMBER_OF_HOSTS = 10 NUMBER_OF_HOSTS = 10
ZAPP_PER_HOST = 8 ZAPP_PER_HOST = 8
TOTAL_JOBS = NUMBER_OF_HOSTS * ZAPP_PER_HOST TOTAL_JOBS = NUMBER_OF_HOSTS * ZAPP_PER_HOST
# TOTAL_JOBS = 1000
MAX_TO_START_PER_LOOP = 10 MAX_TO_START_PER_LOOP = 10
...@@ -66,9 +65,7 @@ def count_jobs(all=False): ...@@ -66,9 +65,7 @@ def count_jobs(all=False):
count = 0 count = 0
for e_id in execs: for e_id in execs:
e = exec_api.get(e_id) e = exec_api.get(e_id)
if e is None: if e is None or (not all and e['name'] != 'boinc-loader'):
continue
elif not all and e['name'] != 'boinc-loader':
continue continue
if e['status'] != 'terminated': if e['status'] != 'terminated':
count += 1 count += 1
...@@ -89,6 +86,7 @@ def delete_finished(): ...@@ -89,6 +86,7 @@ def delete_finished():
def start_batches(zapp, log): def start_batches(zapp, log):
"""Start zapps in batches."""
zapps_to_start = TOTAL_JOBS - count_jobs() zapps_to_start = TOTAL_JOBS - count_jobs()
print('I need to start {} zapps'.format(zapps_to_start)) print('I need to start {} zapps'.format(zapps_to_start))
while zapps_to_start > 0: while zapps_to_start > 0:
...@@ -100,7 +98,7 @@ def start_batches(zapp, log): ...@@ -100,7 +98,7 @@ def start_batches(zapp, log):
continue continue
to_start_now = random.randint(1, MAX_TO_START_PER_LOOP) to_start_now = random.randint(1, MAX_TO_START_PER_LOOP)
print('Will submit {} new zapps'.format(to_start_now)) print('Will submit {} new zapps'.format(to_start_now))
for i in range(to_start_now): for i_ in range(to_start_now):
zapp_id = submit_zapp(zapp) zapp_id = submit_zapp(zapp)
zapps_to_start -= 1 zapps_to_start -= 1
print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start)) print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
...@@ -108,6 +106,7 @@ def start_batches(zapp, log): ...@@ -108,6 +106,7 @@ def start_batches(zapp, log):
def start_continuous(zapp, log): def start_continuous(zapp, log):
"""Start zapps with a random distribution of interval times."""
zapps_to_start = TOTAL_JOBS - count_jobs() zapps_to_start = TOTAL_JOBS - count_jobs()
print('I need to start {} zapps'.format(zapps_to_start)) print('I need to start {} zapps'.format(zapps_to_start))
while zapps_to_start > 0: while zapps_to_start > 0:
...@@ -121,12 +120,13 @@ def start_continuous(zapp, log): ...@@ -121,12 +120,13 @@ def start_continuous(zapp, log):
def keep_some_running(zapp): def keep_some_running(zapp):
"""Always keep a certain number of zapps running."""
while True: while True:
running_zapps = count_jobs(all=True) running_zapps = count_jobs(all=True)
zapps_to_start = TOTAL_JOBS - running_zapps zapps_to_start = TOTAL_JOBS - running_zapps
print('I need to start {} zapps ({} running)'.format(zapps_to_start, running_zapps)) print('I need to start {} zapps ({} running)'.format(zapps_to_start, running_zapps))
if zapps_to_start > 0: if zapps_to_start > 0:
for i in range(zapps_to_start): for i_ in range(zapps_to_start):
queue_length = check_queue_length() queue_length = check_queue_length()
zapp_id = submit_zapp(zapp) zapp_id = submit_zapp(zapp)
print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start)) print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
...@@ -145,8 +145,6 @@ def main(): ...@@ -145,8 +145,6 @@ def main():
log = csv.DictWriter(open('run.csv', 'w'), fieldnames=log_fieldnames) log = csv.DictWriter(open('run.csv', 'w'), fieldnames=log_fieldnames)
log.writeheader() log.writeheader()
zapp = load_zapp(sys.argv[1]) zapp = load_zapp(sys.argv[1])
# start_batches(zapp, log)
# start_continuous(zapp, log)
keep_some_running(zapp) keep_some_running(zapp)
print('All Zapps submitted, my work is done.') print('All Zapps submitted, my work is done.')
......
...@@ -7,6 +7,7 @@ Usage: boinc_trace.py <out_file> ...@@ -7,6 +7,7 @@ Usage: boinc_trace.py <out_file>
""" """
import os import os
import sys
import requests import requests
...@@ -14,6 +15,8 @@ from zoe_lib.statistics import ZoeStatisticsAPI ...@@ -14,6 +15,8 @@ from zoe_lib.statistics import ZoeStatisticsAPI
from zoe_lib.executions import ZoeExecutionsAPI from zoe_lib.executions import ZoeExecutionsAPI
TARGET_QUEUE_LENGTH = 5 TARGET_QUEUE_LENGTH = 5
INFLUXDB_PORT = '8086'
INFLUXDB_ADDRESS = ''
def zoe_url(): def zoe_url():
...@@ -47,11 +50,12 @@ def submit_zapp(zapp): ...@@ -47,11 +50,12 @@ def submit_zapp(zapp):
def get_influx_cpu_data(exec_id): def get_influx_cpu_data(exec_id):
"""Get CPU data."""
query_params = { query_params = {
'db': 'telegraf', 'db': 'telegraf',
'q': 'SELECT LAST(usage_total) FROM docker_container_cpu WHERE "zoe.execution.id" = \'{}\' AND cpu = \'cpu-total\''.format(exec_id) 'q': 'SELECT LAST(usage_total) FROM docker_container_cpu WHERE "zoe.execution.id" = \'{}\' AND cpu = \'cpu-total\''.format(exec_id)
} }
resp = requests.get("http://192.168.45.2:8086/query", params=query_params) resp = requests.get("http://" + INFLUXDB_ADDRESS + "/query", params=query_params)
resp = resp.json() resp = resp.json()
try: try:
cpu_usage = resp['results'][0]['series'][0]['values'][0][1] cpu_usage = resp['results'][0]['series'][0]['values'][0][1]
...@@ -61,11 +65,12 @@ def get_influx_cpu_data(exec_id): ...@@ -61,11 +65,12 @@ def get_influx_cpu_data(exec_id):
def get_influx_mem_data(exec_id): def get_influx_mem_data(exec_id):
"""Get memory data."""
query_params = { query_params = {
'db': 'telegraf', 'db': 'telegraf',
'q': 'SELECT MAX(max_usage) FROM docker_container_mem WHERE "zoe.execution.id" = \'{}\''.format(exec_id) 'q': 'SELECT MAX(max_usage) FROM docker_container_mem WHERE "zoe.execution.id" = \'{}\''.format(exec_id)
} }
resp = requests.get("http://192.168.45.2:8086/query", params=query_params) resp = requests.get("http://" + INFLUXDB_ADDRESS + "/query", params=query_params)
resp = resp.json() resp = resp.json()
try: try:
mem_usage = resp['results'][0]['series'][0]['values'][0][1] mem_usage = resp['results'][0]['series'][0]['values'][0][1]
...@@ -75,11 +80,12 @@ def get_influx_mem_data(exec_id): ...@@ -75,11 +80,12 @@ def get_influx_mem_data(exec_id):
def get_influx_net_rx_data(exec_id): def get_influx_net_rx_data(exec_id):
"""Get network RX data."""
query_params = { query_params = {
'db': 'telegraf', 'db': 'telegraf',
'q': 'SELECT LAST(rx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id) 'q': 'SELECT LAST(rx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id)
} }
resp = requests.get("http://192.168.45.2:8086/query", params=query_params) resp = requests.get("http://" + INFLUXDB_ADDRESS + "/query", params=query_params)
resp = resp.json() resp = resp.json()
try: try:
net_rx_usage = resp['results'][0]['series'][0]['values'][0][1] net_rx_usage = resp['results'][0]['series'][0]['values'][0][1]
...@@ -89,11 +95,12 @@ def get_influx_net_rx_data(exec_id): ...@@ -89,11 +95,12 @@ def get_influx_net_rx_data(exec_id):
def get_influx_net_tx_data(exec_id): def get_influx_net_tx_data(exec_id):
"""Get network TX data."""
query_params = { query_params = {
'db': 'telegraf', 'db': 'telegraf',
'q': 'SELECT LAST(tx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id) 'q': 'SELECT LAST(tx_bytes) FROM docker_container_net WHERE "zoe.execution.id" = \'{}\' AND network = \'eth1\''.format(exec_id)
} }
resp = requests.get("http://192.168.45.2:8086/query", params=query_params) resp = requests.get("http://" + INFLUXDB_ADDRESS + "/query", params=query_params)
resp = resp.json() resp = resp.json()
try: try:
net_tx_usage = resp['results'][0]['series'][0]['values'][0][1] net_tx_usage = resp['results'][0]['series'][0]['values'][0][1]
...@@ -103,11 +110,12 @@ def get_influx_net_tx_data(exec_id): ...@@ -103,11 +110,12 @@ def get_influx_net_tx_data(exec_id):
def get_influx_blkio_data(exec_id): def get_influx_blkio_data(exec_id):
"""Get disk data."""
query_params = { query_params = {
'db': 'telegraf', 'db': 'telegraf',
'q': 'SELECT LAST(io_serviced_recursive_total) FROM docker_container_blkio WHERE "zoe.execution.id" = \'{}\''.format(exec_id) 'q': 'SELECT LAST(io_serviced_recursive_total) FROM docker_container_blkio WHERE "zoe.execution.id" = \'{}\''.format(exec_id)
} }
resp = requests.get("http://192.168.45.2:8086/query", params=query_params) resp = requests.get("http://" + INFLUXDB_ADDRESS + "/query", params=query_params)
resp = resp.json() resp = resp.json()
try: try:
blkio_usage = resp['results'][0]['series'][0]['values'][0][1] blkio_usage = resp['results'][0]['series'][0]['values'][0][1]
...@@ -118,7 +126,9 @@ def get_influx_blkio_data(exec_id): ...@@ -118,7 +126,9 @@ def get_influx_blkio_data(exec_id):
def main(): def main():
"""Main.""" """Main."""
global INFLUXDB_ADDRESS
exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass()) exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
INFLUXDB_ADDRESS = sys.argv[1] + ':' + INFLUXDB_PORT
execs = exec_api.list() execs = exec_api.list()
print('id,time_submit,time_start,time_end,cpu_usage,mem_usage,net_rx_usage,net_tx_usage,blkio_usage') print('id,time_submit,time_start,time_end,cpu_usage,mem_usage,net_rx_usage,net_tx_usage,blkio_usage')
for e_id in execs: for e_id in execs:
......
#!/usr/bin/python3 #!/usr/bin/python3
"""
Find the Swarm manager by querying ZooKeeper.
"""
import sys import sys
from kazoo.client import KazooClient from kazoo.client import KazooClient
def zookeeper_swarm(zk_server_list, path='/swarm'): def zookeeper_swarm(zk_server_list, path='/swarm'):
path = path + '/docker/swarm/leader' """Query ZooKeeper."""
path += '/docker/swarm/leader'
zk = KazooClient(hosts=zk_server_list) zk = KazooClient(hosts=zk_server_list)
zk.start() zk.start()
master, stat = zk.get(path) master, stat_ = zk.get(path)
zk.stop() zk.stop()
return master.decode('utf-8') return master.decode('utf-8')
......
...@@ -48,7 +48,7 @@ def delete_tag_cmd(args): ...@@ -48,7 +48,7 @@ def delete_tag_cmd(args):
def main(): def main():
"""Main entrypoint.""" """Main entrypoint."""
parser = argparse.ArgumentParser(description="Docker Registry client") parser = argparse.ArgumentParser(description="Docker Registry client")
parser.add_argument("-r", "--registry", help="Registry URL", default="http:127.0.0.1:5000") parser.add_argument("-r", "--registry", help="Registry URL", default="http://127.0.0.1:5000")
subparser = parser.add_subparsers() subparser = parser.add_subparsers()
......
#!/usr/bin/python3 #!/usr/bin/python3
"""
Set the command to execute inside a ZApp.
"""
import json import json
import sys import sys
......
...@@ -612,7 +612,7 @@ class ArgumentParser(argparse.ArgumentParser): ...@@ -612,7 +612,7 @@ class ArgumentParser(argparse.ArgumentParser):
value = str(value).lower() value = str(value).lower()
config_file_items[config_file_keys[0]] = value config_file_items[config_file_keys[0]] = value
elif source == _ENV_VAR_SOURCE_KEY: elif source == _ENV_VAR_SOURCE_KEY or source == _DEFAULTS_SOURCE_KEY:
for key, (action, value) in settings.items(): for key, (action, value) in settings.items():
config_file_keys = self.get_possible_config_keys(action) config_file_keys = self.get_possible_config_keys(action)
if config_file_keys: if config_file_keys:
...@@ -622,13 +622,6 @@ class ArgumentParser(argparse.ArgumentParser): ...@@ -622,13 +622,6 @@ class ArgumentParser(argparse.ArgumentParser):
elif source.startswith(_CONFIG_FILE_SOURCE_KEY): elif source.startswith(_CONFIG_FILE_SOURCE_KEY):
for key, (action, value) in settings.items(): for key, (action, value) in settings.items():
config_file_items[key] = value config_file_items[key] = value
elif source == _DEFAULTS_SOURCE_KEY:
for key, (action, value) in settings.items():
config_file_keys = self.get_possible_config_keys(action)
if config_file_keys:
value = getattr(parsed_namespace, action.dest, None)
if value is not None:
config_file_items[config_file_keys[0]] = value
return config_file_items return config_file_items
def convert_item_to_command_line_arg(self, action, key, value): def convert_item_to_command_line_arg(self, action, key, value):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment