elastic_scheduler.py 14.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The Elastic scheduler is the implementation of the scheduling algorithm presented in this paper:
https://arxiv.org/abs/1611.09528
"""

import logging
import threading
import time

Daniele Venzano's avatar
Daniele Venzano committed
25
from zoe_lib.state import Execution, SQLManager, Service  # pylint: disable=unused-import
26
from zoe_master.exceptions import ZoeException
27

28
from zoe_master.backends.interface import terminate_execution, terminate_service, start_elastic, start_essential, update_service_resource_limits
29
30
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
Daniele Venzano's avatar
Daniele Venzano committed
31
from zoe_master.stats import NodeStats  # pylint: disable=unused-import
32
from zoe_master.metrics.base import StatsManager  # pylint: disable=unused-import
33
34
35

log = logging.getLogger(__name__)

36
SELF_TRIGGER_TIMEOUT = 60  # the scheduler will trigger itself periodically in case platform resources have changed outside its control
37
38


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def catch_exceptions_and_retry(func):
    """Decorator to catch exceptions in threaded functions."""
    def wrapper(self):
        """The wrapper."""
        while True:
            try:
                func(self)
            except BaseException:  # pylint: disable=broad-except
                log.exception('Unmanaged exception in thread loop')
            else:
                log.debug('Thread terminated')
                break
    return wrapper


54
55
56
57
58
59
60
class ExecutionProgress:
    """Additional data for tracking execution sizes while in the queue."""
    def __init__(self):
        self.last_time_scheduled = 0
        self.progress_sequence = []


61
62
class ZoeElasticScheduler:
    """The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
63
    def __init__(self, state: SQLManager, policy, metrics: StatsManager):
64
        if policy not in ('FIFO', 'SIZE', 'DYNSIZE'):
65
            raise UnsupportedSchedulerPolicyError
66
        self.metrics = metrics
67
68
69
        self.trigger_semaphore = threading.Semaphore(0)
        self.policy = policy
        self.queue = []
70
        self.queue_running = []
71
        self.queue_termination = []
72
73
        self.additional_exec_state = {}
        self.loop_quit = False
74
        self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
75
76
        self.core_limit_recalc_trigger = threading.Event()
        self.core_limit_th = threading.Thread(target=self._adjust_core_limits, name='adjust_core_limits')
77
        self.state = state
78
        for execution in self.state.executions.select(status='running'):
79
80
81
82
            if execution.all_services_running:
                self.queue_running.append(execution)
            else:
                self.queue.append(execution)
83
                self.additional_exec_state[execution.id] = ExecutionProgress()
84
85
        self.loop_th.start()
        self.core_limit_th.start()
86
87
88
89
90
91
92

    def trigger(self):
        """Trigger a scheduler run."""
        self.trigger_semaphore.release()

    def incoming(self, execution: Execution):
        """
93
        This method adds the execution to the end of the queue and triggers the scheduler.
94
95
96
        :param execution: The execution
        :return:
        """
97
        exec_data = ExecutionProgress()
98
        self.additional_exec_state[execution.id] = exec_data
99
        self.queue.append(execution)
100
101
102
103
104
105
106
107
        self.trigger()

    def terminate(self, execution: Execution) -> None:
        """
        Inform the master that an execution has been terminated. This can be done asynchronously.
        :param execution: the terminated execution
        :return: None
        """
108
109
110
111
112
113
        execution.set_cleaning_up()
        self.queue_termination.append(execution)

    def _terminate_executions(self):
        while len(self.queue_termination) > 0:
            execution = self.queue_termination.pop(0)
114
            try:
115
                self.queue.remove(execution)
116
            except ValueError:
117
118
119
120
                try:
                    self.queue_running.remove(execution)
                except ValueError:
                    log.warning('Execution {} is not in any queue, attempting termination anyway'.format(execution.id))
121

122
123
124
125
            try:
                del self.additional_exec_state[execution.id]
            except KeyError:
                pass
126

127
128
            terminate_execution(execution)
            log.debug('Execution {} terminated successfully'.format(execution.id))
129
130

    def _refresh_execution_sizes(self):
131
132
133
134
135
136
137
        if self.policy == "FIFO":
            return
        elif self.policy == "SIZE":
            return
        elif self.policy == "DYNSIZE":
            for execution in self.queue:  # type: Execution
                exec_data = self.additional_exec_state[execution.id]
138
                if exec_data.last_time_scheduled == 0:
139
                    continue
140
141
                elif execution.size <= 0:
                    execution.set_size(execution.total_reservations.cores.min * execution.total_reservations.memory.min)
Daniele Venzano's avatar
Daniele Venzano committed
142
                    continue
143
                new_size = execution.size - (time.time() - exec_data.last_time_scheduled) * (256 * 1024 ** 2)  # to be tuned
144
                execution.set_size(new_size)
145

146
    def _pop_all(self):
147
        out_list = []
148
        for execution in self.queue:  # type: Execution
149
            if execution.status != Execution.TERMINATED_STATUS or execution.status != Execution.CLEANING_UP_STATUS:
150
                out_list.append(execution)
151
            else:
152
                log.debug('While popping, throwing away execution {} that is in status {}'.format(execution.id, execution.status))
153
154
155

        return out_list

156
    def _requeue(self, execution: Execution):
157
158
159
        self.additional_exec_state[execution.id].last_time_scheduled = time.time()
        if execution not in self.queue:  # sanity check: the execution should be in the queue
            log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id))
160

161
    @catch_exceptions_and_retry
162
    def loop_start_th(self):  # pylint: disable=too-many-locals
163
        """The Scheduler thread loop."""
164
        auto_trigger = SELF_TRIGGER_TIMEOUT
165
        while True:
Daniele Venzano's avatar
Daniele Venzano committed
166
            ret = self.trigger_semaphore.acquire(timeout=1)
167
            if not ret:  # Semaphore timeout, do some cleanup
Daniele Venzano's avatar
Daniele Venzano committed
168
169
                auto_trigger -= 1
                if auto_trigger == 0:
170
                    auto_trigger = SELF_TRIGGER_TIMEOUT
Daniele Venzano's avatar
Daniele Venzano committed
171
172
                    self.trigger()
                continue
173

Daniele Venzano's avatar
Daniele Venzano committed
174
175
            if self.loop_quit:
                break
176

177
            self._check_dead_services()
178
179
            self._terminate_executions()

Daniele Venzano's avatar
Daniele Venzano committed
180
181
            if len(self.queue) == 0:
                log.debug("Scheduler loop has been triggered, but the queue is empty")
182
                self.core_limit_recalc_trigger.set()
Daniele Venzano's avatar
Daniele Venzano committed
183
184
                continue
            log.debug("Scheduler loop has been triggered")
185

Daniele Venzano's avatar
Daniele Venzano committed
186
187
            while True:  # Inner loop will run until no new executions can be started or the queue is empty
                self._refresh_execution_sizes()
188

189
                if self.policy == "SIZE" or self.policy == "DYNSIZE":
Daniele Venzano's avatar
Daniele Venzano committed
190
                    self.queue.sort(key=lambda execution: execution.size)
191

192
                jobs_to_attempt_scheduling = self._pop_all()
Daniele Venzano's avatar
Daniele Venzano committed
193
194
                log.debug('Scheduler inner loop, jobs to attempt scheduling:')
                for job in jobs_to_attempt_scheduling:
195
                    log.debug("-> {} ({})".format(job, job.size))
Daniele Venzano's avatar
Daniele Venzano committed
196

197
                try:
198
                    platform_state = self.metrics.current_stats
199
200
201
                except ZoeException:
                    log.error('Cannot retrieve platform state, cannot schedule')
                    for job in jobs_to_attempt_scheduling:
202
                        self._requeue(job)
203
204
                    break

Daniele Venzano's avatar
Daniele Venzano committed
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
                cluster_status_snapshot = SimulatedPlatform(platform_state)

                jobs_to_launch = []
                free_resources = cluster_status_snapshot.aggregated_free_memory()

                # Try to find a placement solution using a snapshot of the platform status
                for job in jobs_to_attempt_scheduling:  # type: Execution
                    jobs_to_launch_copy = jobs_to_launch.copy()

                    # remove all elastic services from the previous simulation loop
                    for job_aux in jobs_to_launch:  # type: Execution
                        cluster_status_snapshot.deallocate_elastic(job_aux)

                    job_can_start = False
                    if not job.is_running:
                        job_can_start = cluster_status_snapshot.allocate_essential(job)

                    if job_can_start or job.is_running:
                        jobs_to_launch.append(job)

                    # Try to put back the elastic services
                    for job_aux in jobs_to_launch:
                        cluster_status_snapshot.allocate_elastic(job_aux)

                    current_free_resources = cluster_status_snapshot.aggregated_free_memory()
                    if current_free_resources >= free_resources:
                        jobs_to_launch = jobs_to_launch_copy
232
                        break
Daniele Venzano's avatar
Daniele Venzano committed
233
234
                    free_resources = current_free_resources

235
236
                placements = cluster_status_snapshot.get_service_allocation()
                log.debug('Allocation after simulation: {}'.format(placements))
Daniele Venzano's avatar
Daniele Venzano committed
237
238
239
240

                # We port the results of the simulation into the real cluster
                for job in jobs_to_launch:  # type: Execution
                    if not job.essential_services_running:
241
                        ret = start_essential(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
242
243
                        if ret == "fatal":
                            jobs_to_attempt_scheduling.remove(job)
244
                            self.queue.remove(job)
Daniele Venzano's avatar
Daniele Venzano committed
245
246
                            continue  # trow away the execution
                        elif ret == "requeue":
247
                            self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
248
249
250
                            continue
                        elif ret == "ok":
                            job.set_running()
251

Daniele Venzano's avatar
Daniele Venzano committed
252
253
                        assert ret == "ok"

254
                    start_elastic(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
255
256
257
258

                    if job.all_services_active:
                        log.debug('execution {}: all services are active'.format(job.id))
                        jobs_to_attempt_scheduling.remove(job)
259
                        self.queue.remove(job)
Daniele Venzano's avatar
Daniele Venzano committed
260
261
                        self.queue_running.append(job)

262
                self.core_limit_recalc_trigger.set()
263

Daniele Venzano's avatar
Daniele Venzano committed
264
                for job in jobs_to_attempt_scheduling:
265
                    self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
266
267
268
269
270
271
272

                if len(self.queue) == 0:
                    log.debug('empty queue, exiting inner loop')
                    break
                if len(jobs_to_launch) == 0:
                    log.debug('No executions could be started, exiting inner loop')
                    break
273
274
275
276
277

    def quit(self):
        """Stop the scheduler thread."""
        self.loop_quit = True
        self.trigger()
278
        self.core_limit_recalc_trigger.set()
279
        self.loop_th.join()
280
        self.core_limit_th.join()
281
282
283

    def stats(self):
        """Scheduler statistics."""
284
285
286
287
288
        if self.policy == "SIZE":
            queue = sorted(self.queue, key=lambda execution: execution.size)
        else:
            queue = self.queue

289
290
        return {
            'queue_length': len(self.queue),
291
            'running_length': len(self.queue_running),
292
            'termination_queue_length': len(self.queue_termination),
293
            'queue': [s.id for s in queue],
294
295
            'running_queue': [s.id for s in self.queue_running],
            'termination_queue': [s.id for s in self.queue_termination]
296
        }
297

298
    @catch_exceptions_and_retry
299
    def _adjust_core_limits(self):
300
        self.core_limit_recalc_trigger.clear()
301
302
303
304
        while not self.loop_quit:
            self.core_limit_recalc_trigger.wait()
            if self.loop_quit:
                break
305
            stats = self.metrics.current_stats
306
307
            for node in stats.nodes:  # type: NodeStats
                new_core_allocations = {}
308
309
310
311
312
                node_services = self.state.services.select(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
                if len(node_services) == 0:
                    continue

                for service in node_services:
313
314
                    new_core_allocations[service.id] = service.resource_reservation.cores.min

315
316
317
                if node.cores_reserved < node.cores_total:
                    cores_free = node.cores_total - node.cores_reserved
                    cores_to_add = cores_free / len(node_services)
318
319
320
                else:
                    cores_to_add = 0

321
                for service in node_services:
322
323
324
                    update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)

            self.core_limit_recalc_trigger.clear()
Daniele Venzano's avatar
Daniele Venzano committed
325
326
327
328
329
330
331
332

    def _check_dead_services(self):
        # Check for executions that are no longer viable since an essential service died
        for execution in self.queue_running:
            for service in execution.services:
                if service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Essential service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
                    service.restarted()
333
                    execution.set_error_message("Essential service {} died".format(service.name))
Daniele Venzano's avatar
Daniele Venzano committed
334
335
                    self.terminate(execution)
                    break
336
        # Check for executions that need to be re-queued because one of the elastic components died
Daniele Venzano's avatar
Daniele Venzano committed
337
338
339
340
341
        # Do it in two loops to prevent rescheduling executions that need to be terminated
        for execution in self.queue_running:
            for service in execution.services:
                if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
342
                    terminate_service(service)
Daniele Venzano's avatar
Daniele Venzano committed
343
344
345
346
                    service.restarted()
                    self.queue_running.remove(execution)
                    self.queue.append(execution)
                    break