Commit c033c1c5 authored by Daniele Venzano's avatar Daniele Venzano

Properly terminate all async tasks in zoe-scheduler

parent 586c031d
......@@ -12,6 +12,12 @@ from zoe_scheduler.scheduler import zoe_sched
def sigint_handler():
log.warning('CTRL-C detected, terminating event loop...')
loop.stop()
zoe_sched.stop_tasks()
rpyc_server.stop()
try:
loop.run_forever()
except RuntimeError:
pass
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
......@@ -27,10 +33,6 @@ if __name__ == "__main__":
zoe_sched.init_tasks()
try:
loop.run_forever()
except KeyboardInterrupt:
log.warning('CTRL-C detected, terminating event loop...')
loop.run_forever()
loop.run_until_complete(rpyc_server.server.wait_closed())
loop.close()
......@@ -3,7 +3,7 @@ import logging
log = logging.getLogger(__name__)
class PeriodicTask(object):
class PeriodicTask:
def __init__(self, func, interval):
self.func = func
self.interval = interval
......@@ -19,3 +19,6 @@ class PeriodicTask(object):
except:
log.exception("Exception in periodic task")
self._set()
def stop(self):
self._handler.cancel()
......@@ -140,4 +140,8 @@ class RPyCAsyncIOServer:
self.logger.info("server started on [%s]:%s", self.hostname, self.port)
if self.auto_register:
self._bg_register()
PeriodicTask(self._bg_register, self.registrar.REREGISTER_INTERVAL)
self.register_task = PeriodicTask(self._bg_register, self.registrar.REREGISTER_INTERVAL)
def stop(self):
self.register_task.stop()
self.server.close()
......@@ -64,13 +64,22 @@ class ZoeScheduler:
self.platform = PlatformManager()
self.platform_status = PlatformStatus()
self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status)
self.tasks = []
def init_tasks(self):
self.platform_status.update()
PeriodicTask(self.platform_status.update, conf["status_refresh_interval"])
PeriodicTask(self.schedule, conf['scheduler_task_interval'])
PeriodicTask(pm.update_proxy_access_timestamps, conf['proxy_update_accesses'])
PeriodicTask(self.platform.check_executions_health, conf["check_health"])
tsk = PeriodicTask(self.platform_status.update, conf["status_refresh_interval"])
self.tasks.append(tsk)
tsk = PeriodicTask(self.schedule, conf['scheduler_task_interval'])
self.tasks.append(tsk)
tsk = PeriodicTask(pm.update_proxy_access_timestamps, conf['proxy_update_accesses'])
self.tasks.append(tsk)
tsk = PeriodicTask(self.platform.check_executions_health, conf["check_health"])
self.tasks.append(tsk)
def stop_tasks(self):
for tsk in self.tasks:
tsk.stop()
def incoming(self, execution: Execution) -> bool:
if not self.scheduler_policy.admission_control(execution.application.required_resources):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment