simple_scheduler.py 4.42 KB
Newer Older
1
# Copyright (c) 2016, Daniele Venzano
2 3 4 5 6 7 8 9 10 11 12 13 14 15
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16 17
"""The Scheduler."""

18
import logging
Daniele Venzano's avatar
Daniele Venzano committed
19
import threading
20

Daniele Venzano's avatar
Daniele Venzano committed
21
from zoe_lib.state import Execution
22
from zoe_master.backends.interface import start_all, terminate_execution
23
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
Daniele Venzano's avatar
Daniele Venzano committed
24
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
25 26 27 28

log = logging.getLogger(__name__)


29
class ZoeSimpleScheduler(ZoeBaseScheduler):
30
    """The Scheduler class."""
Daniele Venzano's avatar
Daniele Venzano committed
31
    def __init__(self, state, policy):
32
        super().__init__(state)
Daniele Venzano's avatar
Daniele Venzano committed
33 34
        if policy != 'FIFO':
            raise UnsupportedSchedulerPolicyError
Daniele Venzano's avatar
Daniele Venzano committed
35 36 37 38 39 40
        self.fifo_queue = []
        self.trigger_semaphore = threading.Semaphore(0)
        self.async_threads = []
        self.loop_quit = False
        self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
        self.loop_th.start()
41

Daniele Venzano's avatar
Daniele Venzano committed
42
    def trigger(self):
43
        """Trigger a scheduler run."""
Daniele Venzano's avatar
Daniele Venzano committed
44
        self.trigger_semaphore.release()
45 46 47

    def incoming(self, execution: Execution):
        """
48
        This method adds the execution to the end of the FIFO queue and triggers the scheduler.
Daniele Venzano's avatar
Daniele Venzano committed
49
        :param execution: The execution
50 51
        :return:
        """
Daniele Venzano's avatar
Daniele Venzano committed
52 53
        self.fifo_queue.append(execution)
        self.trigger()
54

Daniele Venzano's avatar
Daniele Venzano committed
55
    def terminate(self, execution: Execution) -> None:
56
        """
Daniele Venzano's avatar
Daniele Venzano committed
57
        Inform the master that an execution has been terminated. This can be done asynchronously.
58 59 60
        :param execution: the terminated execution
        :return: None
        """
Daniele Venzano's avatar
Daniele Venzano committed
61
        def async_termination():
62
            """Actual termination run in a thread."""
Daniele Venzano's avatar
Daniele Venzano committed
63 64
            terminate_execution(execution)
            self.trigger()
65

66 67 68 69
        try:
            self.fifo_queue.remove(execution)
        except ValueError:
            pass
Daniele Venzano's avatar
Daniele Venzano committed
70 71 72 73 74
        th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id))
        th.start()
        self.async_threads.append(th)

    def loop_start_th(self):
75
        """The Scheduler thread loop."""
Daniele Venzano's avatar
Daniele Venzano committed
76 77
        auto_trigger_base = 60  # seconds
        auto_trigger = auto_trigger_base
Daniele Venzano's avatar
Daniele Venzano committed
78
        while True:
79
            ret = self.trigger_semaphore.acquire(timeout=1)
Daniele Venzano's avatar
Daniele Venzano committed
80 81 82 83 84 85 86 87
            if not ret:  # Semaphore timeout, do some thread cleanup
                counter = len(self.async_threads)
                while counter > 0:
                    if len(self.async_threads) == 0:
                        break
                    th = self.async_threads.pop(0)
                    th.join(0.1)
                    if th.isAlive():  # join failed
88
                        log.debug('Thread {} join failed'.format(th.name))
Daniele Venzano's avatar
Daniele Venzano committed
89 90
                        self.async_threads.append(th)
                    counter -= 1
Daniele Venzano's avatar
Daniele Venzano committed
91 92 93 94
                auto_trigger -= 1
                if auto_trigger == 0:
                    auto_trigger = auto_trigger_base
                    self.trigger()
Daniele Venzano's avatar
Daniele Venzano committed
95 96 97 98 99 100 101 102 103 104 105 106 107
                continue
            if self.loop_quit:
                break

            log.debug("Scheduler start loop has been triggered")
            if len(self.fifo_queue) == 0:
                continue

            e = self.fifo_queue[0]
            assert isinstance(e, Execution)
            e.set_starting()
            self.fifo_queue.pop(0)  # remove the execution form the queue

108 109
            ret = start_all(e)
            if ret == 'requeue':
Daniele Venzano's avatar
Daniele Venzano committed
110
                self.fifo_queue.append(e)
111 112
            elif ret == 'fatal':
                continue  # throw away the execution
Daniele Venzano's avatar
Daniele Venzano committed
113 114 115 116
            else:
                e.set_running()

    def quit(self):
117
        """Stop the scheduler thread."""
Daniele Venzano's avatar
Daniele Venzano committed
118 119 120
        self.loop_quit = True
        self.trigger()
        self.loop_th.join()
121 122 123 124 125

    def stats(self):
        """Scheduler statistics."""
        return {
            'queue_length': len(self.fifo_queue),
126 127 128 129 130
            'running_length': 0,
            'termination_threads_count': len(self.async_threads),
            'queue': [s.id for s in self.fifo_queue],
            'running_queue': [],
            'platform_stats': {}
131
        }