Commit b6a8cd82 authored by Daniele Venzano's avatar Daniele Venzano

Fix a couple of bugs and catch exceptions from the scheduler thread to prevent it from crashing

parent 6afc9c11
......@@ -79,7 +79,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
'deployment_name': get_conf().deployment_name,
}
for service in ordered_service_list:
for service in execution.services:
env_subst_dict['dns_name#' + service.name] = service.dns_name
for service in ordered_service_list:
......
......@@ -49,7 +49,7 @@ class GELFUDPHandler(socketserver.DatagramRequestHandler):
log_file_path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id), service_name + '.txt')
if not os.path.exists(log_file_path):
os.makedirs(os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id)))
os.makedirs(os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id)), exist_ok=True)
open(log_file_path, 'w').write('ZOE HEADER: log file for service {} running on host {}\n'.format(service_name, host))
with open(log_file_path, 'a') as logfile:
......
......@@ -136,106 +136,109 @@ class ZoeElasticScheduler:
auto_trigger_base = 60 # seconds
auto_trigger = auto_trigger_base
while True:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
log.debug("Scheduler loop has been triggered")
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
platform_state = get_platform_state()
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
# Try to find a placement solution using a snapshot of the platform status
for job in jobs_to_attempt_scheduling: # type: Execution
jobs_to_launch_copy = jobs_to_launch.copy()
# remove all elastic services from the previous simulation loop
for job_aux in jobs_to_launch: # type: Execution
cluster_status_snapshot.deallocate_elastic(job_aux)
job_can_start = False
if not job.is_running:
job_can_start = cluster_status_snapshot.allocate_essential(job)
if job_can_start or job.is_running:
jobs_to_launch.append(job)
# Try to put back the elastic services
for job_aux in jobs_to_launch:
cluster_status_snapshot.allocate_elastic(job_aux)
current_free_resources = cluster_status_snapshot.aggregated_free_memory()
if current_free_resources >= free_resources:
jobs_to_launch = jobs_to_launch_copy
break
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
try:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
log.debug("Scheduler loop has been triggered")
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
platform_state = get_platform_state()
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
# Try to find a placement solution using a snapshot of the platform status
for job in jobs_to_attempt_scheduling: # type: Execution
jobs_to_launch_copy = jobs_to_launch.copy()
# remove all elastic services from the previous simulation loop
for job_aux in jobs_to_launch: # type: Execution
cluster_status_snapshot.deallocate_elastic(job_aux)
job_can_start = False
if not job.is_running:
job_can_start = cluster_status_snapshot.allocate_essential(job)
if job_can_start or job.is_running:
jobs_to_launch.append(job)
# Try to put back the elastic services
for job_aux in jobs_to_launch:
cluster_status_snapshot.allocate_elastic(job_aux)
current_free_resources = cluster_status_snapshot.aggregated_free_memory()
if current_free_resources >= free_resources:
jobs_to_launch = jobs_to_launch_copy
break
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job)
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job)
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
self.queue_running.append(job)
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
self.queue = jobs_to_attempt_scheduling + self.queue
self.queue = jobs_to_attempt_scheduling + self.queue
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
break
if len(jobs_to_launch) == 0:
log.debug('No executions could be started, exiting inner loop')
break
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
break
if len(jobs_to_launch) == 0:
log.debug('No executions could be started, exiting inner loop')
break
except Exception:
log.exception('Unmanaged exception in scheduler loop')
def quit(self):
"""Stop the scheduler thread."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment