Commit 7c1b6dae authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Fix pylint errors

parent b6a8cd82
......@@ -17,7 +17,6 @@
from datetime import datetime, timedelta
import logging
import re
import os
import zoe_api.exceptions
......
......@@ -46,7 +46,7 @@ class ZoeElasticScheduler:
self.additional_exec_state = {}
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
self.loop_th = threading.Thread(target=self._thread_wrapper, name='scheduler')
self.loop_th.start()
self.state = state
......@@ -131,114 +131,121 @@ class ZoeElasticScheduler:
return out_list
def _thread_wrapper(self):
while True:
try:
self.loop_start_th()
except BaseException: # pylint: disable=broad-except
log.exception('Unmanaged exception in scheduler loop')
else:
log.debug('Scheduler thread terminated')
break
def loop_start_th(self):
"""The Scheduler thread loop."""
auto_trigger_base = 60 # seconds
auto_trigger = auto_trigger_base
while True:
try:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
log.debug("Scheduler loop has been triggered")
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
platform_state = get_platform_state()
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
# Try to find a placement solution using a snapshot of the platform status
for job in jobs_to_attempt_scheduling: # type: Execution
jobs_to_launch_copy = jobs_to_launch.copy()
# remove all elastic services from the previous simulation loop
for job_aux in jobs_to_launch: # type: Execution
cluster_status_snapshot.deallocate_elastic(job_aux)
job_can_start = False
if not job.is_running:
job_can_start = cluster_status_snapshot.allocate_essential(job)
if job_can_start or job.is_running:
jobs_to_launch.append(job)
# Try to put back the elastic services
for job_aux in jobs_to_launch:
cluster_status_snapshot.allocate_elastic(job_aux)
current_free_resources = cluster_status_snapshot.aggregated_free_memory()
if current_free_resources >= free_resources:
jobs_to_launch = jobs_to_launch_copy
break
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job)
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
log.debug("Scheduler loop has been triggered")
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
self.queue = jobs_to_attempt_scheduling + self.queue
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
break
if len(jobs_to_launch) == 0:
log.debug('No executions could be started, exiting inner loop')
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
platform_state = get_platform_state()
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
# Try to find a placement solution using a snapshot of the platform status
for job in jobs_to_attempt_scheduling: # type: Execution
jobs_to_launch_copy = jobs_to_launch.copy()
# remove all elastic services from the previous simulation loop
for job_aux in jobs_to_launch: # type: Execution
cluster_status_snapshot.deallocate_elastic(job_aux)
job_can_start = False
if not job.is_running:
job_can_start = cluster_status_snapshot.allocate_essential(job)
if job_can_start or job.is_running:
jobs_to_launch.append(job)
# Try to put back the elastic services
for job_aux in jobs_to_launch:
cluster_status_snapshot.allocate_elastic(job_aux)
current_free_resources = cluster_status_snapshot.aggregated_free_memory()
if current_free_resources >= free_resources:
jobs_to_launch = jobs_to_launch_copy
break
except Exception:
log.exception('Unmanaged exception in scheduler loop')
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job)
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
self.queue = jobs_to_attempt_scheduling + self.queue
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
break
if len(jobs_to_launch) == 0:
log.debug('No executions could be started, exiting inner loop')
break
def quit(self):
"""Stop the scheduler thread."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment