Commit c9cec6d0 authored by Daniele Venzano's avatar Daniele Venzano

Fix elastic scheduler race condition

The scheduler now checks only for services to be active, not running.
Add a running queue
parent 01d60c18
......@@ -211,6 +211,7 @@ def stats_cmd(auth, args_):
stats_api = ZoeStatisticsAPI(auth['url'], auth['user'], auth['pass'])
sched = stats_api.scheduler()
print('Scheduler queue length: {}'.format(sched['queue_length']))
print('Scheduler running queue length: {}'.format(sched['running_length']))
print('Termination threads count: {}'.format(sched['termination_threads_count']))
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
......
......@@ -171,12 +171,20 @@ class Execution:
@property
def all_services_running(self) -> bool:
"""Return True if all services of this execution are running/active"""
"""Return True if all services of this execution are running."""
for service in self.services:
if service.is_dead():
return False
return True
@property
def all_services_active(self) -> bool:
"""Return True if all services of this execution are active."""
for service in self.services:
if service.status != service.ACTIVE_STATUS:
return False
return True
@property
def running_services_count(self) -> int:
"""Returns the number of services of this execution that are running."""
......
......@@ -42,6 +42,7 @@ class ZoeElasticScheduler:
self.trigger_semaphore = threading.Semaphore(0)
self.policy = policy
self.queue = []
self.queue_running = []
self.additional_exec_state = {}
self.async_threads = []
self.loop_quit = False
......@@ -80,7 +81,10 @@ class ZoeElasticScheduler:
try:
self.queue.remove(execution)
except ValueError:
log.error('Terminating execution {} that is not in scheduler queue'.format(execution.id))
try:
self.queue_running.remove(execution)
except ValueError:
log.error('Terminating execution {} that is not in any queue'.format(execution.id))
try:
del self.additional_exec_state[execution.id]
......@@ -213,10 +217,11 @@ class ZoeElasticScheduler:
start_elastic(job)
if job.all_services_running:
log.debug('execution {}: all services started'.format(job.id))
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
......@@ -241,5 +246,6 @@ class ZoeElasticScheduler:
"""Scheduler statistics."""
return {
'queue_length': len(self.queue),
'running_length': len(self.queue_running),
'termination_threads_count': len(self.async_threads)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment