Commit 31a4d207 authored by Daniele Venzano's avatar Daniele Venzano

Create a scheduler interface and move the current scheduler into the scheduler package

parent 016d9d21
...@@ -59,8 +59,7 @@ def restart_resubmit_scheduler(state: SQLManager, scheduler: ZoeScheduler): ...@@ -59,8 +59,7 @@ def restart_resubmit_scheduler(state: SQLManager, scheduler: ZoeScheduler):
scheduler.incoming(e) scheduler.incoming(e)
def execution_delete(scheduler: ZoeScheduler, execution: Execution): def execution_delete(execution: Execution):
"""Remove an execution from the scheduler, must only be called if the execution is NOT running.""" """Remove an execution, must only be called if the execution is NOT running."""
assert not execution.is_active() assert not execution.is_active()
exec_logs.delete(execution) exec_logs.delete(execution)
scheduler.remove_execution(execution)
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The base class for Zoe schedulers"""
import zoe_lib.sql_manager
class ZoeBaseScheduler:
"""The base class for Zoe schedulers"""
def __init__(self, state: zoe_lib.sql_manager.SQLManager):
self.state = state
def trigger(self):
"""Trigger a scheduler run."""
raise NotImplementedError
def incoming(self, execution: zoe_lib.sql_manager.Execution):
"""
This method adds the execution to the end of the FIFO queue and triggers the scheduler.
:param execution: The execution
:return:
"""
raise NotImplementedError
def terminate(self, execution: zoe_lib.sql_manager.Execution) -> None:
"""
Inform the master that an execution has been terminated. This can be done asynchronously.
:param execution: the terminated execution
:return: None
"""
raise NotImplementedError
def quit(self):
"""Stop the scheduler."""
raise NotImplementedError
def stats(self):
"""Scheduler statistics."""
raise NotImplementedError
...@@ -22,13 +22,15 @@ from zoe_lib.sql_manager import Execution ...@@ -22,13 +22,15 @@ from zoe_lib.sql_manager import Execution
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class ZoeScheduler: class ZoeSimpleScheduler(ZoeBaseScheduler):
"""The Scheduler class.""" """The Scheduler class."""
def __init__(self): def __init__(self, state):
super().__init__(state)
self.fifo_queue = [] self.fifo_queue = []
self.trigger_semaphore = threading.Semaphore(0) self.trigger_semaphore = threading.Semaphore(0)
self.async_threads = [] self.async_threads = []
...@@ -68,13 +70,6 @@ class ZoeScheduler: ...@@ -68,13 +70,6 @@ class ZoeScheduler:
th.start() th.start()
self.async_threads.append(th) self.async_threads.append(th)
def remove_execution(self, execution: Execution):
"""Removes the execution form the queue."""
try:
self.fifo_queue.remove(execution)
except ValueError:
pass
def loop_start_th(self): def loop_start_th(self):
"""The Scheduler thread loop.""" """The Scheduler thread loop."""
auto_trigger_base = 60 # seconds auto_trigger_base = 60 # seconds
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment