Commit 26abf4b8 authored by Daniele Venzano's avatar Daniele Venzano

Pull images before adding the execution to the scheduler queue

parent dc955f8e
......@@ -161,16 +161,6 @@ class APIEndpoint:
if success:
return message
def retry_submit_error_executions(self):
"""Resubmit any execution forgotten by the master."""
waiting_execs = self.sql.execution_list(status=zoe_lib.state.sql_manager.Execution.SUBMIT_STATUS)
if waiting_execs is None or len(waiting_execs) == 0:
return
e = waiting_execs[0]
success, message = self.master.execution_start(e.id)
if not success:
log.warning('Zoe Master unavailable ({}), execution {} still waiting'.format(message, e.id))
def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services."""
log.debug('Starting dead execution cleanup task')
......
......@@ -77,8 +77,6 @@ def zoe_web_main() -> int:
http_server.bind(args.listen_port, args.listen_address)
http_server.start(num_processes=1)
retry_cb = PeriodicCallback(api_endpoint.retry_submit_error_executions, 30000)
retry_cb.start()
retry_cb = PeriodicCallback(api_endpoint.cleanup_dead_executions, 60000)
retry_cb.start()
......
......@@ -322,3 +322,4 @@ class DockerClient:
self.cli.images.pull(image_name)
except docker.errors.APIError as e:
log.error('Cannot download image {}: {}'.format(image_name, e))
raise ZoeException('Cannot download image {}: {}'.format(image_name, e))
......@@ -86,20 +86,20 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
engine = DockerClient(conf)
return engine.logs(service.backend_id, True, False)
def _real_preload(self, image_name, host_conf):
log.debug('Preloading image {} on host {}'.format(image_name, host_conf.name))
time_start = time.time()
my_engine = DockerClient(host_conf)
my_engine.pull_image(image_name)
log.debug('Image {} preloaded on host {} in {:.2f}s'.format(image_name, host_conf.name, time.time() - time_start))
def preload_image(self, image_name):
"""Pull an image from a Docker registry into each host. We shuffle the list to prevent the scheduler to find always the first host in the list."""
th_list = []
for backend_host_conf in self.docker_config:
th = threading.Thread(target=self._real_preload, name='dk_image_preload_{}'.format(backend_host_conf.name), args=(image_name, backend_host_conf), daemon=True)
th.start()
th_list.append(th)
for th in th_list:
th.join()
one_success = False
for host_conf in self.docker_config:
log.debug('Pre-loading image {} on host {}'.format(image_name, host_conf.name))
time_start = time.time()
my_engine = DockerClient(host_conf)
try:
my_engine.pull_image(image_name)
except ZoeException:
log.error('Image {} pre-loading failed on host {}'.format(image_name, host_conf.name))
continue
else:
one_success = True
log.debug('Image {} pre-loaded on host {} in {:.2f}s'.format(image_name, host_conf.name, time.time() - time_start))
if not one_success:
raise ZoeException('Cannot pull image {}'.format(image_name))
......@@ -68,7 +68,6 @@ class APIManager:
if execution is None:
self._reply_error('Execution ID {} not found'.format(message['exec_id']))
else:
execution.set_scheduled()
self._reply_ok()
zoe_master.preprocessing.execution_submit(self.state, self.scheduler, execution)
elif message['command'] == 'execution_terminate':
......
......@@ -24,6 +24,7 @@ from zoe_lib.state import Execution, SQLManager
from zoe_lib.config import get_conf
from zoe_master.scheduler import ZoeBaseScheduler
from zoe_master.backends.interface import preload_image
from zoe_master.exceptions import ZoeException
log = logging.getLogger(__name__)
......@@ -59,13 +60,24 @@ def _digest_application_description(state: SQLManager, execution: Execution):
assert counter == total_count
if get_conf().backend_image_management:
threading.Thread(target=preload_image, args=(service_descr['image'],), name='image-downloader-{}'.format(service_descr['name']), daemon=True).start()
try:
preload_image(service_descr['image'])
except ZoeException as e:
execution.set_error_message('{}'.format(e))
execution.set_error()
return False
return True
def _do_execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution):
if _digest_application_description(state, execution):
execution.set_scheduled()
scheduler.incoming(execution)
def execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution):
"""Submit a new execution to the scheduler."""
_digest_application_description(state, execution)
scheduler.incoming(execution)
threading.Thread(target=_do_execution_submit, args=(state, scheduler, execution), name='submission_{}'.format(execution.id), daemon=True).start()
def execution_terminate(scheduler: ZoeBaseScheduler, execution: Execution):
......@@ -75,6 +87,10 @@ def execution_terminate(scheduler: ZoeBaseScheduler, execution: Execution):
def restart_resubmit_scheduler(state: SQLManager, scheduler: ZoeBaseScheduler):
"""Restart work after a restart of the process."""
submitted_execs = state.execution_list(status=Execution.SUBMIT_STATUS)
for e in submitted_execs:
execution_submit(state, scheduler, e)
sched_execs = state.execution_list(status=Execution.SCHEDULED_STATUS)
for e in sched_execs:
scheduler.incoming(e)
......
......@@ -64,7 +64,7 @@ class ZoeElasticScheduler:
def incoming(self, execution: Execution):
"""
This method adds the execution to the end of the FIFO queue and triggers the scheduler.
This method adds the execution to the end of the queue and triggers the scheduler.
:param execution: The execution
:return:
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment