elastic_scheduler.py 15.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The Elastic scheduler is the implementation of the scheduling algorithm presented in this paper:
https://arxiv.org/abs/1611.09528
"""

import logging
import threading
import time

Daniele Venzano's avatar
Daniele Venzano committed
25
from zoe_lib.state import Execution, SQLManager, Service  # pylint: disable=unused-import
26
from zoe_master.exceptions import ZoeException
27

28
from zoe_master.backends.interface import terminate_execution, terminate_service, start_elastic, start_essential, update_service_resource_limits
29
30
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
Daniele Venzano's avatar
Daniele Venzano committed
31
from zoe_master.stats import NodeStats  # pylint: disable=unused-import
32
from zoe_master.metrics.base import StatsManager  # pylint: disable=unused-import
33
34
35

log = logging.getLogger(__name__)

36
SELF_TRIGGER_TIMEOUT = 60  # the scheduler will trigger itself periodically in case platform resources have changed outside its control
37
38


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def catch_exceptions_and_retry(func):
    """Decorator to catch exceptions in threaded functions."""
    def wrapper(self):
        """The wrapper."""
        while True:
            try:
                func(self)
            except BaseException:  # pylint: disable=broad-except
                log.exception('Unmanaged exception in thread loop')
            else:
                log.debug('Thread terminated')
                break
    return wrapper


54
55
56
57
58
59
60
class ExecutionProgress:
    """Additional data for tracking execution sizes while in the queue."""
    def __init__(self):
        self.last_time_scheduled = 0
        self.progress_sequence = []


61
62
class ZoeElasticScheduler:
    """The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
63
    def __init__(self, state: SQLManager, policy, metrics: StatsManager):
64
        if policy not in ('FIFO', 'SIZE', 'DYNSIZE'):
65
            raise UnsupportedSchedulerPolicyError
66
        self.metrics = metrics
67
68
69
        self.trigger_semaphore = threading.Semaphore(0)
        self.policy = policy
        self.queue = []
70
        self.queue_running = []
71
72
73
        self.additional_exec_state = {}
        self.async_threads = []
        self.loop_quit = False
74
        self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
75
76
        self.core_limit_recalc_trigger = threading.Event()
        self.core_limit_th = threading.Thread(target=self._adjust_core_limits, name='adjust_core_limits')
77
        self.state = state
78
        for execution in self.state.executions.select(status='running'):
79
80
81
82
            if execution.all_services_running:
                self.queue_running.append(execution)
            else:
                self.queue.append(execution)
83
                self.additional_exec_state[execution.id] = ExecutionProgress()
84
85
        self.loop_th.start()
        self.core_limit_th.start()
86
87
88
89
90
91
92

    def trigger(self):
        """Trigger a scheduler run."""
        self.trigger_semaphore.release()

    def incoming(self, execution: Execution):
        """
93
        This method adds the execution to the end of the queue and triggers the scheduler.
94
95
96
        :param execution: The execution
        :return:
        """
97
        exec_data = ExecutionProgress()
98
        self.additional_exec_state[execution.id] = exec_data
99
        self.queue.append(execution)
100
101
102
103
104
105
106
107
108
109
110
        self.trigger()

    def terminate(self, execution: Execution) -> None:
        """
        Inform the master that an execution has been terminated. This can be done asynchronously.
        :param execution: the terminated execution
        :return: None
        """
        def async_termination(e):
            """Actual termination runs in a thread."""
            with e.termination_lock:
111
112
113
114
115
                try:
                    terminate_execution(e)
                except ZoeException as ex:
                    log.error('Error in termination thread: {}'.format(ex))
                    return
116
117
118
119
120
121
                self.trigger()
            log.debug('Execution {} terminated successfully'.format(e.id))

        try:
            self.queue.remove(execution)
        except ValueError:
122
123
124
            try:
                self.queue_running.remove(execution)
            except ValueError:
125
126
                log.error('Cannot terminate execution {}, it is not in any queue'.format(execution.id))
                return
127
128
129
130
131

        try:
            del self.additional_exec_state[execution.id]
        except KeyError:
            pass
132
        self.core_limit_recalc_trigger.set()
133
134
135
136
137
138
139
140
141
142
143
144
145

        th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id), args=(execution,))
        th.start()
        self.async_threads.append(th)

    def _cleanup_async_threads(self):
        counter = len(self.async_threads)
        while counter > 0:
            if len(self.async_threads) == 0:
                break
            th = self.async_threads.pop(0)
            th.join(0.1)
            if th.isAlive():  # join failed
146
                # log.debug('Thread {} join failed'.format(th.name))
147
148
149
150
                self.async_threads.append(th)
            counter -= 1

    def _refresh_execution_sizes(self):
151
152
153
154
155
156
157
        if self.policy == "FIFO":
            return
        elif self.policy == "SIZE":
            return
        elif self.policy == "DYNSIZE":
            for execution in self.queue:  # type: Execution
                exec_data = self.additional_exec_state[execution.id]
158
                if exec_data.last_time_scheduled == 0:
159
                    continue
160
161
                elif execution.size <= 0:
                    execution.set_size(execution.total_reservations.cores.min * execution.total_reservations.memory.min)
Daniele Venzano's avatar
Daniele Venzano committed
162
                    continue
163
                new_size = execution.size - (time.time() - exec_data.last_time_scheduled) * (256 * 1024 ** 2)  # to be tuned
164
                execution.set_size(new_size)
165

166
    def _pop_all(self):
167
        out_list = []
168
        for execution in self.queue:  # type: Execution
169
170
171
            ret = execution.termination_lock.acquire(blocking=False)
            if ret and execution.status != Execution.TERMINATED_STATUS:
                out_list.append(execution)
172
            else:
173
                log.debug('While popping, throwing away execution {} that has the termination lock held'.format(execution.id))
174
175
176

        return out_list

177
178
    def _requeue(self, execution: Execution):
        execution.termination_lock.release()
179
180
181
        self.additional_exec_state[execution.id].last_time_scheduled = time.time()
        if execution not in self.queue:  # sanity check: the execution should be in the queue
            log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id))
182

183
    @catch_exceptions_and_retry
184
    def loop_start_th(self):  # pylint: disable=too-many-locals
185
        """The Scheduler thread loop."""
186
        auto_trigger = SELF_TRIGGER_TIMEOUT
187
        while True:
Daniele Venzano's avatar
Daniele Venzano committed
188
189
190
191
192
            ret = self.trigger_semaphore.acquire(timeout=1)
            if not ret:  # Semaphore timeout, do some thread cleanup
                self._cleanup_async_threads()
                auto_trigger -= 1
                if auto_trigger == 0:
193
                    auto_trigger = SELF_TRIGGER_TIMEOUT
Daniele Venzano's avatar
Daniele Venzano committed
194
195
196
197
                    self.trigger()
                continue
            if self.loop_quit:
                break
198

199
            self._check_dead_services()
Daniele Venzano's avatar
Daniele Venzano committed
200
201
            if len(self.queue) == 0:
                log.debug("Scheduler loop has been triggered, but the queue is empty")
202
                self.core_limit_recalc_trigger.set()
Daniele Venzano's avatar
Daniele Venzano committed
203
204
                continue
            log.debug("Scheduler loop has been triggered")
205

Daniele Venzano's avatar
Daniele Venzano committed
206
207
            while True:  # Inner loop will run until no new executions can be started or the queue is empty
                self._refresh_execution_sizes()
208

209
                if self.policy == "SIZE" or self.policy == "DYNSIZE":
Daniele Venzano's avatar
Daniele Venzano committed
210
                    self.queue.sort(key=lambda execution: execution.size)
211

212
                jobs_to_attempt_scheduling = self._pop_all()
Daniele Venzano's avatar
Daniele Venzano committed
213
214
                log.debug('Scheduler inner loop, jobs to attempt scheduling:')
                for job in jobs_to_attempt_scheduling:
215
                    log.debug("-> {} ({})".format(job, job.size))
Daniele Venzano's avatar
Daniele Venzano committed
216

217
                try:
218
                    platform_state = self.metrics.current_stats
219
220
221
                except ZoeException:
                    log.error('Cannot retrieve platform state, cannot schedule')
                    for job in jobs_to_attempt_scheduling:
222
                        self._requeue(job)
223
224
                    break

Daniele Venzano's avatar
Daniele Venzano committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
                cluster_status_snapshot = SimulatedPlatform(platform_state)

                jobs_to_launch = []
                free_resources = cluster_status_snapshot.aggregated_free_memory()

                # Try to find a placement solution using a snapshot of the platform status
                for job in jobs_to_attempt_scheduling:  # type: Execution
                    jobs_to_launch_copy = jobs_to_launch.copy()

                    # remove all elastic services from the previous simulation loop
                    for job_aux in jobs_to_launch:  # type: Execution
                        cluster_status_snapshot.deallocate_elastic(job_aux)

                    job_can_start = False
                    if not job.is_running:
                        job_can_start = cluster_status_snapshot.allocate_essential(job)

                    if job_can_start or job.is_running:
                        jobs_to_launch.append(job)

                    # Try to put back the elastic services
                    for job_aux in jobs_to_launch:
                        cluster_status_snapshot.allocate_elastic(job_aux)

                    current_free_resources = cluster_status_snapshot.aggregated_free_memory()
                    if current_free_resources >= free_resources:
                        jobs_to_launch = jobs_to_launch_copy
252
                        break
Daniele Venzano's avatar
Daniele Venzano committed
253
254
                    free_resources = current_free_resources

255
256
                placements = cluster_status_snapshot.get_service_allocation()
                log.debug('Allocation after simulation: {}'.format(placements))
Daniele Venzano's avatar
Daniele Venzano committed
257
258
259
260

                # We port the results of the simulation into the real cluster
                for job in jobs_to_launch:  # type: Execution
                    if not job.essential_services_running:
261
                        ret = start_essential(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
262
263
                        if ret == "fatal":
                            jobs_to_attempt_scheduling.remove(job)
264
265
                            self.queue.remove(job)
                            job.termination_lock.release()
Daniele Venzano's avatar
Daniele Venzano committed
266
267
                            continue  # trow away the execution
                        elif ret == "requeue":
268
                            self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
269
270
271
                            continue
                        elif ret == "ok":
                            job.set_running()
272

Daniele Venzano's avatar
Daniele Venzano committed
273
274
                        assert ret == "ok"

275
                    start_elastic(job, placements)
Daniele Venzano's avatar
Daniele Venzano committed
276
277
278
279
280

                    if job.all_services_active:
                        log.debug('execution {}: all services are active'.format(job.id))
                        job.termination_lock.release()
                        jobs_to_attempt_scheduling.remove(job)
281
                        self.queue.remove(job)
Daniele Venzano's avatar
Daniele Venzano committed
282
283
                        self.queue_running.append(job)

284
                self.core_limit_recalc_trigger.set()
285

Daniele Venzano's avatar
Daniele Venzano committed
286
                for job in jobs_to_attempt_scheduling:
287
                    self._requeue(job)
Daniele Venzano's avatar
Daniele Venzano committed
288
289
290
291
292
293
294

                if len(self.queue) == 0:
                    log.debug('empty queue, exiting inner loop')
                    break
                if len(jobs_to_launch) == 0:
                    log.debug('No executions could be started, exiting inner loop')
                    break
295
296
297
298
299

    def quit(self):
        """Stop the scheduler thread."""
        self.loop_quit = True
        self.trigger()
300
        self.core_limit_recalc_trigger.set()
301
        self.loop_th.join()
302
        self.core_limit_th.join()
303
304
305

    def stats(self):
        """Scheduler statistics."""
306
307
308
309
310
        if self.policy == "SIZE":
            queue = sorted(self.queue, key=lambda execution: execution.size)
        else:
            queue = self.queue

311
312
        return {
            'queue_length': len(self.queue),
313
            'running_length': len(self.queue_running),
314
315
            'termination_threads_count': len(self.async_threads),
            'queue': [s.id for s in queue],
316
            'running_queue': [s.id for s in self.queue_running]
317
        }
318

319
    @catch_exceptions_and_retry
320
    def _adjust_core_limits(self):
321
        self.core_limit_recalc_trigger.clear()
322
323
324
325
        while not self.loop_quit:
            self.core_limit_recalc_trigger.wait()
            if self.loop_quit:
                break
326
            stats = self.metrics.current_stats
327
328
            for node in stats.nodes:  # type: NodeStats
                new_core_allocations = {}
329
330
331
332
333
                node_services = self.state.services.select(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
                if len(node_services) == 0:
                    continue

                for service in node_services:
334
335
                    new_core_allocations[service.id] = service.resource_reservation.cores.min

336
337
338
                if node.cores_reserved < node.cores_total:
                    cores_free = node.cores_total - node.cores_reserved
                    cores_to_add = cores_free / len(node_services)
339
340
341
                else:
                    cores_to_add = 0

342
                for service in node_services:
343
344
345
                    update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)

            self.core_limit_recalc_trigger.clear()
Daniele Venzano's avatar
Daniele Venzano committed
346
347
348
349
350
351
352
353
354
355
356

    def _check_dead_services(self):
        # Check for executions that are no longer viable since an essential service died
        for execution in self.queue_running:
            for service in execution.services:
                if service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Essential service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
                    service.restarted()
                    execution.set_cleaning_up()
                    self.terminate(execution)
                    break
357
        # Check for executions that need to be re-queued because one of the elastic components died
Daniele Venzano's avatar
Daniele Venzano committed
358
359
360
361
362
        # Do it in two loops to prevent rescheduling executions that need to be terminated
        for execution in self.queue_running:
            for service in execution.services:
                if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
                    log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
363
                    terminate_service(service)
Daniele Venzano's avatar
Daniele Venzano committed
364
365
366
367
                    service.restarted()
                    self.queue_running.remove(execution)
                    self.queue.append(execution)
                    break