Commit 01fc385a authored by Daniele Venzano's avatar Daniele Venzano

Make terminations synchronous with the scheduler loop to remove several race conditions.

parent 629f2f13
......@@ -17,7 +17,6 @@
import datetime
import logging
import threading
import functools
import psycopg2
......@@ -77,8 +76,6 @@ class Execution(BaseRecord):
except KeyError:
self.size = self.description['priority'] # zapp format v2
self.termination_lock = threading.Lock()
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
......
......@@ -68,8 +68,8 @@ class ZoeElasticScheduler:
self.policy = policy
self.queue = []
self.queue_running = []
self.queue_termination = []
self.additional_exec_state = {}
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
self.core_limit_recalc_trigger = threading.Event()
......@@ -105,47 +105,32 @@ class ZoeElasticScheduler:
:param execution: the terminated execution
:return: None
"""
def async_termination(e):
"""Actual termination runs in a thread."""
with e.termination_lock:
try:
terminate_execution(e)
except ZoeException as ex:
log.error('Error in termination thread: {}'.format(ex))
return
self.trigger()
log.debug('Execution {} terminated successfully'.format(e.id))
execution.set_cleaning_up()
self.queue_termination.append(execution)
def _terminate_executions(self):
count = 0
while len(self.queue_termination) > 0:
execution = self.queue_termination.pop(0)
execution.set_cleaning_up()
try:
self.queue.remove(execution)
except ValueError:
try:
self.queue_running.remove(execution)
except ValueError:
log.error('Cannot terminate execution {}, it is not in any queue'.format(execution.id))
return
log.warning('Execution {} is not in any queue, attempting termination anyway'.format(execution.id))
try:
del self.additional_exec_state[execution.id]
except KeyError:
pass
self.core_limit_recalc_trigger.set()
th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id), args=(execution,))
th.start()
self.async_threads.append(th)
def _cleanup_async_threads(self):
counter = len(self.async_threads)
while counter > 0:
if len(self.async_threads) == 0:
break
th = self.async_threads.pop(0)
th.join(0.1)
if th.isAlive(): # join failed
# log.debug('Thread {} join failed'.format(th.name))
self.async_threads.append(th)
counter -= 1
terminate_execution(execution)
log.debug('Execution {} terminated successfully'.format(execution.id))
count += 1
self.core_limit_recalc_trigger.set()
return count != 0
def _refresh_execution_sizes(self):
if self.policy == "FIFO":
......@@ -166,16 +151,14 @@ class ZoeElasticScheduler:
def _pop_all(self):
out_list = []
for execution in self.queue: # type: Execution
ret = execution.termination_lock.acquire(blocking=False)
if ret and execution.status != Execution.TERMINATED_STATUS:
if execution.status != Execution.TERMINATED_STATUS or execution.status != Execution.CLEANING_UP_STATUS:
out_list.append(execution)
else:
log.debug('While popping, throwing away execution {} that has the termination lock held'.format(execution.id))
log.debug('While popping, throwing away execution {} that is in status {}'.format(execution.id, execution.status))
return out_list
def _requeue(self, execution: Execution):
execution.termination_lock.release()
self.additional_exec_state[execution.id].last_time_scheduled = time.time()
if execution not in self.queue: # sanity check: the execution should be in the queue
log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id))
......@@ -186,8 +169,9 @@ class ZoeElasticScheduler:
auto_trigger = SELF_TRIGGER_TIMEOUT
while True:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
if not ret: # Semaphore timeout, do some cleanup
if self._terminate_executions():
auto_trigger = 1
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = SELF_TRIGGER_TIMEOUT
......@@ -262,7 +246,6 @@ class ZoeElasticScheduler:
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
self.queue.remove(job)
job.termination_lock.release()
continue # trow away the execution
elif ret == "requeue":
self._requeue(job)
......@@ -276,7 +259,6 @@ class ZoeElasticScheduler:
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue.remove(job)
self.queue_running.append(job)
......@@ -311,9 +293,10 @@ class ZoeElasticScheduler:
return {
'queue_length': len(self.queue),
'running_length': len(self.queue_running),
'termination_threads_count': len(self.async_threads),
'termination_queue_length': len(self.queue_termination),
'queue': [s.id for s in queue],
'running_queue': [s.id for s in self.queue_running]
'running_queue': [s.id for s in self.queue_running],
'termination_queue': [s.id for s in self.queue_termination]
}
@catch_exceptions_and_retry
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment