Commit dca221e6 authored by Daniele Venzano's avatar Daniele Venzano

Remove deprecated simple scheduler

parent 70fa1dac
......@@ -83,11 +83,9 @@ Authentication:
Scheduler options:
* ``scheduler-class = <ZoeSimpleScheduler | ZoeElasticScheduler>`` : Scheduler class to use for scheduling ZApps (default: elastic scheduler)
* ``scheduler-class = <ZoeElasticScheduler>`` : Scheduler class to use for scheduling ZApps (default: elastic scheduler)
* ``scheduler-policy = <FIFO | SIZE>`` : Scheduler policy to use for scheduling ZApps (default: FIFO)
Default options for the scheduler enable the traditional Zoe scheduler that was already available in the previous releases.
ZApp shop:
* ``zapp-shop-path = /var/lib/zoe-apps`` : Path where ZApp folders are stored
......
sonar.projectKey=zoe
sonar.projectName=zoe
sonar.projectVersion=1.0
sonar.host.url=http://your-sonarqube-server-address
sonar.sources=.
sonar.modules=zoe_api,zoe_master,zoe_lib
sonar.exclusions=zoe_api/web/static/**
sonar.sourceEncoding=UTF-8
......@@ -98,7 +98,7 @@ def load_configuration(test_conf=None):
argparser.add_argument('--proxy-path', help='Proxy base path', default='127.0.0.1')
# Scheduler
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
......
......@@ -16,5 +16,4 @@
"""The Zoe schedulers"""
from .base_scheduler import ZoeBaseScheduler
from .simple_scheduler import ZoeSimpleScheduler
from .elastic_scheduler import ZoeElasticScheduler
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Scheduler."""
import logging
import threading
from zoe_lib.state import Execution
from zoe_master.backends.interface import start_all, terminate_execution
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
log = logging.getLogger(__name__)
class ZoeSimpleScheduler(ZoeBaseScheduler):
"""The Scheduler class."""
def __init__(self, state, policy, metrics):
super().__init__(state)
self.metrics = metrics
if policy != 'FIFO':
raise UnsupportedSchedulerPolicyError
self.fifo_queue = []
self.trigger_semaphore = threading.Semaphore(0)
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
self.loop_th.start()
log.warning('This scheduler class is deprecated and will be removed in version 2018.03, please switch to the elastic scheduler.')
def trigger(self):
"""Trigger a scheduler run."""
self.trigger_semaphore.release()
def incoming(self, execution: Execution):
"""
This method adds the execution to the end of the FIFO queue and triggers the scheduler.
:param execution: The execution
:return:
"""
self.fifo_queue.append(execution)
self.trigger()
def terminate(self, execution: Execution) -> None:
"""
Inform the master that an execution has been terminated. This can be done asynchronously.
:param execution: the terminated execution
:return: None
"""
def async_termination():
"""Actual termination run in a thread."""
terminate_execution(execution)
self.trigger()
try:
self.fifo_queue.remove(execution)
except ValueError:
pass
th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id))
th.start()
self.async_threads.append(th)
def loop_start_th(self):
"""The Scheduler thread loop."""
auto_trigger_base = 60 # seconds
auto_trigger = auto_trigger_base
while True:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
counter = len(self.async_threads)
while counter > 0:
if len(self.async_threads) == 0:
break
th = self.async_threads.pop(0)
th.join(0.1)
if th.isAlive(): # join failed
log.debug('Thread {} join failed'.format(th.name))
self.async_threads.append(th)
counter -= 1
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
log.debug("Scheduler start loop has been triggered")
if len(self.fifo_queue) == 0:
continue
e = self.fifo_queue[0]
assert isinstance(e, Execution)
e.set_starting()
self.fifo_queue.pop(0) # remove the execution form the queue
ret = start_all(e)
if ret == 'requeue':
self.fifo_queue.append(e)
elif ret == 'fatal':
continue # throw away the execution
else:
e.set_running()
def quit(self):
"""Stop the scheduler thread."""
self.loop_quit = True
self.trigger()
self.loop_th.join()
def stats(self):
"""Scheduler statistics."""
return {
'queue_length': len(self.fifo_queue),
'running_length': 0,
'termination_threads_count': len(self.async_threads),
'queue': [s.id for s in self.fifo_queue],
'running_queue': [],
'platform_stats': {}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment