Commit 722a5e2d authored by Simone Rossi's avatar Simone Rossi

[add] Add submission to zoe

parent a3d39e28
Pipeline #11162 canceled with stages
......@@ -35,6 +35,10 @@ import json
from zoe_cmd.utils import read_auth
from zoe_cmd.api_lib import ZoeAPI
import zoe_lib.exceptions
from zoe_lib.state.execution import Execution
from zoe_cmd.api_lib.api_base import retry
LOG_FORMAT = '[%(asctime)-15s %(levelname)s %(name)s] %(message)s'
logger = logging.getLogger()
......@@ -150,7 +154,7 @@ class ParameterGrid(object):
# Reverse so most frequent cycling parameter comes first
keys, values_lists = zip(*sorted(sub_grid.items())[::-1])
sizes = [len(v_list) for v_list in values_lists]
total = np.product(sizes)
total = product(sizes)
if ind >= total:
# Try the next grid
......@@ -176,7 +180,14 @@ def get_all_jobs(bin, executable, param_grid) -> list:
def get_current_usage(api):
running_executions = list(filter(lambda e: (e['status'] == 'running' or e['status'] == 'queued'),
Get all running or ready to run executions
:param api:
:return: list
running_executions = list(filter(lambda e: (e['status'] == Execution.RUNNING_STATUS or
e['status'] == Execution.QUEUED_STATUS or
e['status'] == Execution.SUBMIT_STATUS),
used_jobs = len(running_executions)
......@@ -197,73 +208,93 @@ def get_user_quota(api, user_id):
def get_zapp_resources(zapp):
return zapp['services'][0]['resources']['cores']['min'], zapp['services'][0]['resources']['memory']['min'],
def submit_zapp(zapp, command, name, api):
"""Submits one ZApp for execution."""
zapp['services'][0]['command'] = command
ret = api.executions.start(name, zapp)
except zoe_lib.exceptions.ZoeAPIException as e:
logger.error('Error starting ZApp: {}'.format(str(e)))
ret = None
return ret
def main():
# Definition of command line arg
Args = namedtuple('Args', ['auth_file'])
fauth = os.path.join(os.getenv('HOME'), '.zoerc')
args = Args(fauth)
auth = read_auth(args)
parser = argparse.ArgumentParser()
parser.add_argument('name', type=str, help='Base application name')
parser.add_argument('appfile', type=str, help='Base application description')
parser.add_argument('gridfile', type=str, help="Json file with parameter configurations", default='experiment.json')
args = parser.parse_args()
# Connect to zoe API
api = ZoeAPI(auth['url'], auth['user'], auth['pass'])
# Parse the application description (json file)
with open(args.appfile) as data_file:
zapp = json.load(data_file)
# Test if zapp file is valid
if not api.validation.validate(zapp):
logger.error('Zapp file not valid! Please provide a valid application description')
# Parse experiment setup file and prepare all combination of hyperparameters
with open(args.gridfile) as data_file:
grid = json.load(data_file)
list_of_jobs = get_all_jobs(grid['bin'], grid['executable'], grid['hyperparameters'])
api = ZoeAPI(auth['url'], auth['user'], auth['pass'])
user_id = 13 # TODO: get this from somewhere
user_id = 13 # TODO: get this from somewhere
# Get user quota (this is supposed to be fixed during an experiment - if not: move this in the next loop)
quota_jobs, quota_cores, quota_memory = get_user_quota(api, user_id)'User %s (id: %d) can run %d concurrent jobs with up to %d cores and %.0f GB of memory' %
(auth['user'], user_id, quota_jobs, quota_cores, quota_memory / (1 << 30)))
# Get requested resources for one job
zapp_cores, zapp_memory = get_zapp_resources(zapp)
list_of_jobs = get_all_jobs(grid['bin'], grid['executable'], grid['hyperparameters'])
job_count = 0
while len(list_of_jobs) != 0:
number_new_jobs = len(list_of_jobs)
used_jobs, used_cores, used_memory = get_current_usage(api)'User %s (id: %d) has currently %d running jobs (%d cores and %.0f GB of memory)' %
logger.debug('User %s (id: %d) has currently %d running jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, used_jobs, used_cores, used_memory / (1 << 30)))
zapp_cores, zapp_memory = get_zapp_resources(zapp)'User %s (id: %d) has %d pending jobs (%d cores and %.0f GB of memory)' %
logger.debug('User %s (id: %d) has %d pending jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, number_new_jobs, number_new_jobs * zapp_cores,
number_new_jobs * zapp_memory / (1 << 30)))
constrains = min([
# Number of new jobs to schedule need to satisfy all three constrains (concurrent ex, cores and memory)
count_next_jobs_to_schedule = min([
quota_jobs - used_jobs,
int((quota_cores - used_cores) / zapp_cores),
int((quota_memory - used_memory) / zapp_memory)
if constrains > 0:'Scheduling %d new jobs' % constrains)
if count_next_jobs_to_schedule > 0:'Scheduling %d new jobs' % count_next_jobs_to_schedule)
for i in range(count_next_jobs_to_schedule):
name = + '-%02d' % job_count # name-00; name-01; name-02
next_job = list_of_jobs.pop()
job_count += 1
exec_id = submit_zapp(zapp, next_job, name, api)
if exec_id is None:
logger.error('Continuous failing while submitting new execution. Skipping this job.')
continue'Job submitted with ID %d' % exec_id)
for i in range(constrains):
new_command = list_of_jobs.pop()
# TODO: submit to zoe
logger.warning('Quota exceeded. None jobs can be scheduled now! Sleeping')
logger.warning('Quota exceeded. No jobs can be scheduled now! Sleeping')
# user_endpoint = APIEndpoint(api, )
if __name__ == '__main__':
"bin": "python3",
"executable": "",
"hyperparameters": {
"dataset": ["mnist", "cifar10"],
"method": ["1", "2"],
"parameter1": [2, 6, 1]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment