Commit 077926e7 authored by Daniele Venzano's avatar Daniele Venzano

Fix init race condition

parent 9b1dcd74
......@@ -40,7 +40,8 @@ def main():
tm = PeriodicTaskManager()
zoe_sched.init_tasks(tm)
barrier = zoe_sched.init_tasks(tm)
barrier.wait() # wait for all tasks to be ready and running
ipc_server.start_thread()
......
......@@ -13,18 +13,22 @@ class PeriodicTaskManager:
self.terminate = Event()
self.terminate.clear()
def add_task(self, name, func, delay):
th = Thread(name=name, target=self._generic_task, args=(name, delay, func))
def add_task(self, name, func, delay, ready_barrier):
th = Thread(name=name, target=self._generic_task, args=(name, delay, func, ready_barrier))
th.daemon = True
th.start()
def _generic_task(self, name, delay, func):
def _generic_task(self, name, delay, func, ready_barrier):
log.info("Task {} started".format(name))
init_done = False
while True:
try:
func()
except:
log.exception("Task {} raised an exception".format(name))
if not init_done:
init_done = True
ready_barrier.wait()
stop = self.terminate.wait(delay)
if stop:
break
......
import logging
from threading import Barrier
import time
from zoe_scheduler.platform import PlatformManager
......@@ -73,10 +74,12 @@ class ZoeScheduler:
self.platform_status = PlatformStatus(self)
self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status)
def init_tasks(self, tm: PeriodicTaskManager):
tm.add_task("platform status updater", self.platform_status.update, zoeconf.interval_status_refresh)
tm.add_task("proxy access timestamp updater", pm.update_proxy_access_timestamps, zoeconf.interval_proxy_update_accesses)
tm.add_task("execution health checker", self.platform.check_executions_health, zoeconf.interval_check_health)
def init_tasks(self, tm: PeriodicTaskManager) -> Barrier:
barrier = Barrier(4) # number of tasks + main thread
tm.add_task("platform status updater", self.platform_status.update, zoeconf.interval_status_refresh, barrier)
tm.add_task("proxy access timestamp updater", pm.update_proxy_access_timestamps, zoeconf.interval_proxy_update_accesses, barrier)
tm.add_task("execution health checker", self.platform.check_executions_health, zoeconf.interval_check_health, barrier)
return barrier
def incoming(self, execution: ExecutionState) -> bool:
if not self.scheduler_policy.admission_control(execution.application.required_resources):
......
......@@ -5,7 +5,6 @@ from flask import Blueprint, jsonify, request, session, abort, send_file
from zoe_client import ZoeClient
from common.configuration import ipcconf
from common.exceptions import ApplicationStillRunning
api_bp = Blueprint('api', __name__)
......@@ -70,9 +69,7 @@ def application_delete(app_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
_api_check_user(client)
try:
client.application_remove(app_id)
except ApplicationStillRunning:
if client.application_remove(app_id, False):
return jsonify(status="error", msg="The application has active executions and cannot be deleted")
else:
return jsonify(status="ok")
......
......@@ -4,7 +4,7 @@ from zoe_client import ZoeClient
from common.configuration import ipcconf
from zoe_web.web import web_bp
import zoe_web.utils as web_utils
from common.state.execution import Execution
from zoe_client.entities import Execution
@web_bp.route('/')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment