From 01fc385a68669722ddead4b67ee685e41f79ef88 Mon Sep 17 00:00:00 2001 From: Daniele Venzano Date: Mon, 17 Sep 2018 12:01:35 +0200 Subject: [PATCH] Make terminations synchronous with the scheduler loop to remove several race conditions. --- zoe_lib/state/execution.py | 3 - zoe_master/scheduler/elastic_scheduler.py | 79 +++++++++-------------- 2 files changed, 31 insertions(+), 51 deletions(-) diff --git a/zoe_lib/state/execution.py b/zoe_lib/state/execution.py index ad29627..6f3d78a 100644 --- a/zoe_lib/state/execution.py +++ b/zoe_lib/state/execution.py @@ -17,7 +17,6 @@ import datetime import logging -import threading import functools import psycopg2 @@ -77,8 +76,6 @@ class Execution(BaseRecord): except KeyError: self.size = self.description['priority'] # zapp format v2 - self.termination_lock = threading.Lock() - def serialize(self): """Generates a dictionary that can be serialized in JSON.""" return { diff --git a/zoe_master/scheduler/elastic_scheduler.py b/zoe_master/scheduler/elastic_scheduler.py index fa9d4cb..6cb8c2f 100644 --- a/zoe_master/scheduler/elastic_scheduler.py +++ b/zoe_master/scheduler/elastic_scheduler.py @@ -68,8 +68,8 @@ class ZoeElasticScheduler: self.policy = policy self.queue = [] self.queue_running = [] + self.queue_termination = [] self.additional_exec_state = {} - self.async_threads = [] self.loop_quit = False self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler') self.core_limit_recalc_trigger = threading.Event() @@ -105,47 +105,32 @@ class ZoeElasticScheduler: :param execution: the terminated execution :return: None """ - def async_termination(e): - """Actual termination runs in a thread.""" - with e.termination_lock: - try: - terminate_execution(e) - except ZoeException as ex: - log.error('Error in termination thread: {}'.format(ex)) - return - self.trigger() - log.debug('Execution {} terminated successfully'.format(e.id)) - - try: - self.queue.remove(execution) - except ValueError: + execution.set_cleaning_up() + self.queue_termination.append(execution) + + def _terminate_executions(self): + count = 0 + while len(self.queue_termination) > 0: + execution = self.queue_termination.pop(0) + execution.set_cleaning_up() try: - self.queue_running.remove(execution) + self.queue.remove(execution) except ValueError: - log.error('Cannot terminate execution {}, it is not in any queue'.format(execution.id)) - return - - try: - del self.additional_exec_state[execution.id] - except KeyError: - pass - self.core_limit_recalc_trigger.set() + try: + self.queue_running.remove(execution) + except ValueError: + log.warning('Execution {} is not in any queue, attempting termination anyway'.format(execution.id)) - th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id), args=(execution,)) - th.start() - self.async_threads.append(th) + try: + del self.additional_exec_state[execution.id] + except KeyError: + pass - def _cleanup_async_threads(self): - counter = len(self.async_threads) - while counter > 0: - if len(self.async_threads) == 0: - break - th = self.async_threads.pop(0) - th.join(0.1) - if th.isAlive(): # join failed - # log.debug('Thread {} join failed'.format(th.name)) - self.async_threads.append(th) - counter -= 1 + terminate_execution(execution) + log.debug('Execution {} terminated successfully'.format(execution.id)) + count += 1 + self.core_limit_recalc_trigger.set() + return count != 0 def _refresh_execution_sizes(self): if self.policy == "FIFO": @@ -166,16 +151,14 @@ class ZoeElasticScheduler: def _pop_all(self): out_list = [] for execution in self.queue: # type: Execution - ret = execution.termination_lock.acquire(blocking=False) - if ret and execution.status != Execution.TERMINATED_STATUS: + if execution.status != Execution.TERMINATED_STATUS or execution.status != Execution.CLEANING_UP_STATUS: out_list.append(execution) else: - log.debug('While popping, throwing away execution {} that has the termination lock held'.format(execution.id)) + log.debug('While popping, throwing away execution {} that is in status {}'.format(execution.id, execution.status)) return out_list def _requeue(self, execution: Execution): - execution.termination_lock.release() self.additional_exec_state[execution.id].last_time_scheduled = time.time() if execution not in self.queue: # sanity check: the execution should be in the queue log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id)) @@ -186,8 +169,9 @@ class ZoeElasticScheduler: auto_trigger = SELF_TRIGGER_TIMEOUT while True: ret = self.trigger_semaphore.acquire(timeout=1) - if not ret: # Semaphore timeout, do some thread cleanup - self._cleanup_async_threads() + if not ret: # Semaphore timeout, do some cleanup + if self._terminate_executions(): + auto_trigger = 1 auto_trigger -= 1 if auto_trigger == 0: auto_trigger = SELF_TRIGGER_TIMEOUT @@ -262,7 +246,6 @@ class ZoeElasticScheduler: if ret == "fatal": jobs_to_attempt_scheduling.remove(job) self.queue.remove(job) - job.termination_lock.release() continue # trow away the execution elif ret == "requeue": self._requeue(job) @@ -276,7 +259,6 @@ class ZoeElasticScheduler: if job.all_services_active: log.debug('execution {}: all services are active'.format(job.id)) - job.termination_lock.release() jobs_to_attempt_scheduling.remove(job) self.queue.remove(job) self.queue_running.append(job) @@ -311,9 +293,10 @@ class ZoeElasticScheduler: return { 'queue_length': len(self.queue), 'running_length': len(self.queue_running), - 'termination_threads_count': len(self.async_threads), + 'termination_queue_length': len(self.queue_termination), 'queue': [s.id for s in queue], - 'running_queue': [s.id for s in self.queue_running] + 'running_queue': [s.id for s in self.queue_running], + 'termination_queue': [s.id for s in self.queue_termination] } @catch_exceptions_and_retry -- GitLab