elastic_scheduler.py 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The Elastic scheduler is the implementation of the scheduling algorithm presented in this paper:
https://arxiv.org/abs/1611.09528
"""

import logging
import threading
import time

Daniele Venzano's avatar
Daniele Venzano committed
25
from zoe_lib.state import Execution, SQLManager, Service  # pylint: disable=unused-import
26
from zoe_master.exceptions import ZoeException
27

28
from zoe_master.backends.interface import terminate_execution, terminate_service, start_elastic, start_essential, update_service_resource_limits
29 30
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
Daniele Venzano's avatar
Daniele Venzano committed
31
from zoe_master.stats import NodeStats  # pylint: disable=unused-import
32
from zoe_master.metrics.base import StatsManager  # pylint: disable=unused-import
33 34 35

log = logging.getLogger(__name__)

36
SELF_TRIGGER_TIMEOUT = 60  # the scheduler will trigger itself periodically in case platform resources have changed outside its control
37 38


39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
def catch_exceptions_and_retry(func):
    """Decorator to catch exceptions in threaded functions."""
    def wrapper(self):
        """The wrapper."""
        while True:
            try:
                func(self)
            except BaseException:  # pylint: disable=broad-except
                log.exception('Unmanaged exception in thread loop')
            else:
                log.debug('Thread terminated')
                break
    return wrapper


54 55 56 57 58 59 60
class ExecutionProgress:
    """Additional data for tracking execution sizes while in the queue."""
    def __init__(self):
        self.last_time_scheduled = 0
        self.progress_sequence = []


61 62
class ZoeElasticScheduler:
    """The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
63
    def __init__(self, state: SQLManager, policy, metrics: StatsManager):
64
        if policy not in ('FIFO', 'SIZE', 'DYNSIZE'):
65
            raise UnsupportedSchedulerPolicyError
66
        self.metrics = metrics
67 68 69
        self.trigger_semaphore = threading.Semaphore(0)
        self.policy = policy
        self.queue = []
70
        self.queue_running = []
71
        self.queue_termination = []
72 73
        self.additional_exec_state = {}
        self.loop_quit = False
74
        self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
75 76
        self.core_limit_recalc_trigger = threading.Event()
        self.core_limit_th = threading.Thread(target=self._adjust_core_limits, name='adjust_core_limits')
77
        self.state = state
78
        for execution in self.state.executions.select(status='running'):
79 80 81 82
            if execution.all_services_running:
                self.queue_running.append(execution)
            else:
                self.queue.append(execution)
83
                self.additional_exec_state[execution.id] = ExecutionProgress()
84 85
        self.loop_th.start()
        self.core_limit_th.start()
86 87 88 89 90 91 92

    def trigger(self):
        """Trigger a scheduler run."""
        self.trigger_semaphore.release()

    def incoming(self, execution: Execution):
        """
93
        This method adds the execution to the end of the queue and triggers the scheduler.
94 95 96
        :param execution: The execution
        :return:
        """
97
        exec_data = ExecutionProgress()
98
        self.additional_exec_state[execution.id] = exec_data
99
        self.queue.append(execution)
100 101 102 103 104 105 106 107
        self.trigger()

    def terminate(self, execution: Execution) -> None:
        """
        Inform the master that an execution has been terminated. This can be done asynchronously.
        :param execution: the terminated execution
        :return: None
        """
108 109 110 111 112 113
        execution.set_cleaning_up()
        self.queue_termination.append(execution)

    def _terminate_executions(self):
        while len(self.queue_termination) > 0:
            execution = self.queue_termination.pop(0)
114
            try:
115
                self.queue.remove(execution)
116
            except ValueError:
117 118 119 120
                try:
                    self.queue_running.remove(execution)
                except ValueError:
                    log.warning('Execution {} is not in any queue, attempting termination anyway'.format(execution.id))
121

122 123 124 125
            try:
                del self.additional_exec_state[execution.id]
            except KeyError:
                pass
126

127
            terminate_execution(execution)
128
            log.info('Execution {} terminated successfully'.format(execution.id))
129 130

    def _refresh_execution_sizes(self):
131 132 133 134 135 136
        if self.policy == "FIFO":
            return
        elif self.policy == "SIZE":
            return
        elif self.policy == "DYNSIZE":
            for execution in self.queue:  # type: Execution
137 138 139 140
                try:
                    exec_data = self.additional_exec_state[execution.id]
                except KeyError:
                    continue
141
                if exec_data.last_time_scheduled == 0:
142
                    continue
143 144
                elif execution.size <= 0:
                    execution.set_size(execution.total_reservations.cores.min * execution.total_reservations.memory.min)
Daniele Venzano's avatar
Daniele Venzano committed
145
                    continue
146
                new_size = execution.size - (time.time() - exec_data.last_time_scheduled) * (256 * 1024 ** 2)  # to be tuned
147
                execution.set_size(new_size)
148

149
    def _pop_all(self):
150
        out_list = []
151
        for execution in self.queue:  # type: Execution
152
            if execution.status != Execution.TERMINATED_STATUS or execution.status != Execution.CLEANING_UP_STATUS:
153
                out_list.append(execution)
154
            else:
155
                log.debug('While popping, throwing away execution {} that is in status {}'.format(execution.id, execution.status))
156 157 158

        return out_list

159
    def _requeue(self, execution: Execution):
160 161 162
        self.additional_exec_state[execution.id].last_time_scheduled = time.time()
        if execution not in self.queue:  # sanity check: the execution should be in the queue
            log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id))
163

164
    @catch_exceptions_and_retry
165
    def loop_start_th(self):  # pylint: disable=too-many-locals
166
        """The Scheduler thread loop."""
167
        auto_trigger = SELF_TRIGGER_TIMEOUT
168
        while True:
Daniele Venzano's avatar
Daniele Venzano committed
169
            ret = self.trigger_semaphore.acquire(timeout=1)
170
            if not ret:  # Semaphore timeout, do some cleanup
Daniele Venzano's avatar
Daniele Venzano committed
171 172
                auto_trigger -= 1
                if auto_trigger == 0:
173
                    auto_trigger = SELF_TRIGGER_TIMEOUT
Daniele Venzano's avatar
Daniele Venzano committed
174 175
                    self.trigger()
                continue
176

Daniele Venzano's avatar
Daniele Venzano committed
177 178
            if self.loop_quit:
                break
179

180
            self._check_dead_services()
181 182
            self._terminate_executions()

Daniele Venzano's avatar
Daniele Venzano committed
183 184
            if len(self.queue) == 0:
                log.debug("Scheduler loop has been triggered, but the queue is empty")
185
                self.core_limit_recalc_trigger.set()
Daniele Venzano's avatar
Daniele Venzano committed
186 187
                continue
            log.debug("Scheduler loop has been triggered")
188

Daniele Venzano's avatar
Daniele Venzano committed
189 190
            while True:  # Inner loop will run until no new executions can be started or the queue is empty
                self._refresh_execution_sizes()
191

192
                if self.policy == "SIZE" or self.policy == "DYNSIZE":
Daniele Venzano's avatar
Daniele Venzano committed
193
                    self.queue.sort(key=lambda execution: execution.size)
194

195
                jobs_to_attempt_scheduling = self._pop_all()
Daniele Venzano's avatar
Daniele Venzano committed
196 197
                log.debug('Scheduler inner loop, jobs to attempt scheduling:')
                for job in jobs_to_attempt_scheduling:
198
                    log.debug("-> {} ({})".format(job, job.size))
Daniele Venzano's avatar
Daniele Venzano committed
199

200
                try:
201
                    platform_state = self.metrics.current_stats
202 203 204
                except ZoeException:
                    log.error('Cannot retrieve platform state, cannot schedule')
                    for job in jobs_to_attempt_scheduling:
205
                        self._requeue(job)
206 207
                    break

Daniele Venzano's avatar
Daniele Venzano committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
                cluster_status_snapshot = SimulatedPlatform(platform_state)

                jobs_to_launch = []
                free_resources = cluster_status_snapshot.aggregated_free_memory()

                # Try to find a placement solution using a snapshot of the platform status
                for job in jobs_to_attempt_scheduling:  # type: Execution
                    jobs_to_launch_copy = jobs_to_launch.copy()

                    # remove all elastic services from the previous simulation loop
                    for job_aux in jobs_to_launch:  # type: Execution
                        cluster_status_snapshot.deallocate_elastic(job_aux)

                    job_can_start = False
                    if not job.is_running:
                        job_can_start = cluster_status_snapshot.allocate_essential(job)

                    if job_can_start or job.is_running:
                        jobs_to_launch.append(job)

                    # Try to put back the elastic services
                    for job_aux in jobs_to_launch:
                        cluster_status_snapshot.allocate_elastic(job_aux)

                    current_free_resources = cluster_status_snapshot.aggregated_free_memory()
                    if current_free_resources >= free_resources:
                        jobs_to_launch = jobs_to_launch_copy
235
                        break
Daniele Venzano's avatar
Daniele Venzano committed
236 237
                    free_resources = current_free_resources

238
                placements = cluster_status_snapshot.get_service_allocation()
239
                log.info('Allocation after simulation: {}'.format(placements))
Daniele Venzano's avatar
Daniele Venzano committed
240 241 242 243

                # We port the results of the simulation into the real cluster
                for job in jobs_to_launch:  # type: Execution
                    if not job.essential_services_running:
244
                        ret = start_essential(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
245 246
                        if ret == "fatal":
                            jobs_to_attempt_scheduling.remove(job)
247
                            self.queue.remove(job)
Daniele Venzano's avatar
Daniele Venzano committed
248 249
                            continue  # trow away the execution
                        elif ret == "requeue":
250
                            self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
251 252 253
                            continue
                        elif ret == "ok":
                            job.set_running()
254

Daniele Venzano's avatar
Daniele Venzano committed
255 256
                        assert ret == "ok"

257
                    start_elastic(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
258 259

                    if job.all_services_active:
260
                        log.info('execution {}: all services are active'.format(job.id))
Daniele Venzano's avatar
Daniele Venzano committed
261
                        jobs_to_attempt_scheduling.remove(job)
262
                        self.queue.remove(job)
Daniele Venzano's avatar
Daniele Venzano committed
263 264
                        self.queue_running.append(job)

265
                self.core_limit_recalc_trigger.set()
266

Daniele Venzano's avatar
Daniele Venzano committed
267
                for job in jobs_to_attempt_scheduling:
268
                    self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
269 270 271 272 273 274 275

                if len(self.queue) == 0:
                    log.debug('empty queue, exiting inner loop')
                    break
                if len(jobs_to_launch) == 0:
                    log.debug('No executions could be started, exiting inner loop')
                    break
276 277 278 279 280

    def quit(self):
        """Stop the scheduler thread."""
        self.loop_quit = True
        self.trigger()
281
        self.core_limit_recalc_trigger.set()
282
        self.loop_th.join()
283
        self.core_limit_th.join()
284 285 286

    def stats(self):
        """Scheduler statistics."""
287 288 289 290 291
        if self.policy == "SIZE":
            queue = sorted(self.queue, key=lambda execution: execution.size)
        else:
            queue = self.queue

292 293
        return {
            'queue_length': len(self.queue),
294
            'running_length': len(self.queue_running),
295
            'termination_queue_length': len(self.queue_termination),
296
            'queue': [s.id for s in queue],
297 298
            'running_queue': [s.id for s in self.queue_running],
            'termination_queue': [s.id for s in self.queue_termination]
299
        }
300

301
    @catch_exceptions_and_retry
302
    def _adjust_core_limits(self):
303
        self.core_limit_recalc_trigger.clear()
304 305 306 307
        while not self.loop_quit:
            self.core_limit_recalc_trigger.wait()
            if self.loop_quit:
                break
308
            stats = self.metrics.current_stats
309 310
            for node in stats.nodes:  # type: NodeStats
                new_core_allocations = {}
311 312 313 314 315
                node_services = self.state.services.select(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
                if len(node_services) == 0:
                    continue

                for service in node_services:
316 317
                    new_core_allocations[service.id] = service.resource_reservation.cores.min

318 319 320
                if node.cores_reserved < node.cores_total:
                    cores_free = node.cores_total - node.cores_reserved
                    cores_to_add = cores_free / len(node_services)
321 322 323
                else:
                    cores_to_add = 0

324
                for service in node_services:
325 326 327
                    update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)

            self.core_limit_recalc_trigger.clear()
Daniele Venzano's avatar
Daniele Venzano committed
328 329 330 331 332 333 334 335

    def _check_dead_services(self):
        # Check for executions that are no longer viable since an essential service died
        for execution in self.queue_running:
            for service in execution.services:
                if service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Essential service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
                    service.restarted()
336
                    execution.set_error_message("Essential service {} died".format(service.name))
Daniele Venzano's avatar
Daniele Venzano committed
337 338
                    self.terminate(execution)
                    break
339
        # Check for executions that need to be re-queued because one of the elastic components died
Daniele Venzano's avatar
Daniele Venzano committed
340 341 342 343 344
        # Do it in two loops to prevent rescheduling executions that need to be terminated
        for execution in self.queue_running:
            for service in execution.services:
                if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
345
                    terminate_service(service)
Daniele Venzano's avatar
Daniele Venzano committed
346 347 348 349
                    service.restarted()
                    self.queue_running.remove(execution)
                    self.queue.append(execution)
                    break