Commit 49c376fb authored by Simone Rossi's avatar Simone Rossi

[fix] Fix bug and better debuging

parent e37d9c7b
Pipeline #11164 canceled with stages
......@@ -36,15 +36,23 @@ import json
from zoe_cmd.utils import read_auth
from zoe_cmd.api_lib import ZoeAPI
import zoe_lib.exceptions
from zoe_lib.state.execution import Execution
from zoe_api.entrypoint import LOG_FORMAT
from zoe_cmd.api_lib.api_base import retry
LOG_FORMAT = '[%(asctime)-15s %(levelname)s %(name)s] %(message)s'
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logging.getLogger("requests").setLevel(logging.WARNING)
SUBMIT_STATUS = "submitted"
QUEUED_STATUS = "queued"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
RUNNING_STATUS = "running"
CLEANING_UP_STATUS = "cleaning up"
TERMINATED_STATUS = "terminated"
class ParameterGrid(object):
"""
Grid of parameters with a discrete number of values for each.
......@@ -185,9 +193,9 @@ def get_current_usage(api):
:param api:
:return: list
"""
running_executions = list(filter(lambda e: (e['status'] == Execution.RUNNING_STATUS or
e['status'] == Execution.QUEUED_STATUS or
e['status'] == Execution.SUBMIT_STATUS),
running_executions = list(filter(lambda e: (e['status'] == RUNNING_STATUS or
e['status'] == QUEUED_STATUS or
e['status'] == SUBMIT_STATUS),
api.executions.list()))
used_jobs = len(running_executions)
......@@ -261,41 +269,51 @@ def main():
job_count = 0
job_ids = []
while len(list_of_jobs) != 0:
number_new_jobs = len(list_of_jobs)
used_jobs, used_cores, used_memory = get_current_usage(api)
logger.debug('User %s (id: %d) has currently %d running jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, used_jobs, used_cores, used_memory / (1 << 30)))
logger.debug('User %s (id: %d) has %d pending jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, number_new_jobs, number_new_jobs * zapp_cores,
number_new_jobs * zapp_memory / (1 << 30)))
# Number of new jobs to schedule need to satisfy all three constrains (concurrent ex, cores and memory)
count_next_jobs_to_schedule = min([
quota_jobs - used_jobs,
int((quota_cores - used_cores) / zapp_cores),
int((quota_memory - used_memory) / zapp_memory)
])
if count_next_jobs_to_schedule > 0:
logger.info('Scheduling %d new jobs' % count_next_jobs_to_schedule)
for i in range(count_next_jobs_to_schedule):
name = args.name + '-%02d' % job_count # name-00; name-01; name-02
next_job = list_of_jobs.pop()
job_count += 1
exec_id = submit_zapp(zapp, next_job, name, api)
if exec_id is None:
logger.error('Continuous failing while submitting new execution. Skipping this job.')
continue
logger.info('Job submitted with ID %d' % exec_id)
job_ids.append(exec_id)
else:
logger.warning('Quota exceeded. No jobs can be scheduled now! Sleeping')
time.sleep(15)
try:
while len(list_of_jobs) != 0:
number_new_jobs = len(list_of_jobs)
used_jobs, used_cores, used_memory = get_current_usage(api)
logger.debug('User %s (id: %d) has currently %d running jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, used_jobs, used_cores, used_memory / (1 << 30)))
logger.debug('User %s (id: %d) has %d pending jobs (%d cores and %.0f GB of memory)' %
(auth['user'], user_id, number_new_jobs, number_new_jobs * zapp_cores,
number_new_jobs * zapp_memory / (1 << 30)))
# Number of new jobs to schedule need to satisfy all three constrains (concurrent ex, cores and memory)
count_next_jobs_to_schedule = min([
quota_jobs - used_jobs,
int((quota_cores - used_cores) / zapp_cores),
int((quota_memory - used_memory) / zapp_memory)
])
if count_next_jobs_to_schedule > 0:
logger.info('Scheduling %d new jobs' % count_next_jobs_to_schedule)
for i in range(min(count_next_jobs_to_schedule, len(list_of_jobs))): #
name = args.name + '-%02d' % job_count # name-00; name-01; name-02
next_job = list_of_jobs.pop()
logger.info('Starting job with command \'%s\'' %
' '.join([os.path.split(s)[1] if i == 1 else s for i, s in enumerate(next_job.split(' '))]))
job_count += 1
exec_id = submit_zapp(zapp, next_job, name, api)
if exec_id is None:
logger.error('Continuous failing while submitting new execution. Skipping this job.')
continue
logger.info('Job submitted with ID %d' % exec_id)
job_ids.append(exec_id)
else:
logger.warning('Quota exceeded. No jobs can be scheduled now! Sleeping')
time.sleep(15)
logger.info('All jobs have been submitted successfully! Exiting')
except KeyboardInterrupt:
logger.warning('User interruption. Only %d jobs have been submitted. Jobs in queue: %d' %
(job_count, len(list_of_jobs)))
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment