Commit 70fa1dac authored by Daniele Venzano's avatar Daniele Venzano

Do not empty the queue during a scheduler run and enable dynamic job size calculation

parent 2c849b65
......@@ -18,7 +18,6 @@ The Elastic scheduler is the implementation of the scheduling algorithm presente
https://arxiv.org/abs/1611.09528
"""
from collections import namedtuple
import logging
import threading
import time
......@@ -26,7 +25,7 @@ import time
from zoe_lib.state import Execution, SQLManager, Service # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, start_elastic, start_essential, update_service_resource_limits
from zoe_master.backends.interface import terminate_execution, terminate_service, start_elastic, start_essential, update_service_resource_limits
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
from zoe_master.stats import NodeStats # pylint: disable=unused-import
......@@ -34,7 +33,6 @@ from zoe_master.metrics.base import StatsManager # pylint: disable=unused-impor
log = logging.getLogger(__name__)
ExecutionProgress = namedtuple('ExecutionProgress', ['last_time_scheduled', 'progress_sequence'])
SELF_TRIGGER_TIMEOUT = 60 # the scheduler will trigger itself periodically in case platform resources have changed outside its control
......@@ -53,6 +51,13 @@ def catch_exceptions_and_retry(func):
return wrapper
class ExecutionProgress:
"""Additional data for tracking execution sizes while in the queue."""
def __init__(self):
self.last_time_scheduled = 0
self.progress_sequence = []
class ZoeElasticScheduler:
"""The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
def __init__(self, state: SQLManager, policy, metrics: StatsManager):
......@@ -75,7 +80,7 @@ class ZoeElasticScheduler:
self.queue_running.append(execution)
else:
self.queue.append(execution)
self.additional_exec_state[execution.id] = ExecutionProgress(0, [])
self.additional_exec_state[execution.id] = ExecutionProgress()
self.loop_th.start()
self.core_limit_th.start()
......@@ -89,7 +94,7 @@ class ZoeElasticScheduler:
:param execution: The execution
:return:
"""
exec_data = ExecutionProgress(0, [])
exec_data = ExecutionProgress()
self.additional_exec_state[execution.id] = exec_data
self.queue.append(execution)
self.trigger()
......@@ -154,10 +159,9 @@ class ZoeElasticScheduler:
remaining_execution_time = (1 - progress) * execution.size
execution.size = remaining_execution_time * execution.services_count
def _pop_all_with_same_size(self):
def _pop_all(self):
out_list = []
while len(self.queue) > 0:
execution = self.queue.pop(0) # type: Execution
for execution in self.queue: # type: Execution
ret = execution.termination_lock.acquire(blocking=False)
if ret and execution.status != Execution.TERMINATED_STATUS:
out_list.append(execution)
......@@ -166,6 +170,12 @@ class ZoeElasticScheduler:
return out_list
def _requeue(self, execution: Execution):
execution.termination_lock.release()
if execution not in self.queue: # make sure the execution is in the queue
log.warning("Execution {} re-queued, but it was not in the queue".format(execution.id))
self.queue.append(execution)
@catch_exceptions_and_retry
def loop_start_th(self): # pylint: disable=too-many-locals
"""The Scheduler thread loop."""
......@@ -195,23 +205,17 @@ class ZoeElasticScheduler:
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
jobs_to_attempt_scheduling = self._pop_all()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
log.debug("-> {} ({})".format(job, job.size))
try:
platform_state = self.metrics.current_stats
except ZoeException:
log.error('Cannot retrieve platform state, cannot schedule')
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
self.queue = jobs_to_attempt_scheduling + self.queue
self._requeue(job)
break
cluster_status_snapshot = SimulatedPlatform(platform_state)
......@@ -256,7 +260,7 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
self._requeue(job)
continue
elif ret == "ok":
job.set_running()
......@@ -269,15 +273,14 @@ class ZoeElasticScheduler:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue.remove(job)
self.queue_running.append(job)
self.additional_exec_state[job.id].last_time_scheduled = time.time()
self.core_limit_recalc_trigger.set()
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
self.queue = jobs_to_attempt_scheduling + self.queue
self._requeue(job)
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
......@@ -353,6 +356,7 @@ class ZoeElasticScheduler:
for service in execution.services:
if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
terminate_service(service)
service.restarted()
self.queue_running.remove(execution)
self.queue.append(execution)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment