Commit e1a9d264 authored by Daniele Venzano's avatar Daniele Venzano

Integrate the elastic scheduler

parent abb334e6
......@@ -84,7 +84,8 @@ def load_configuration(test_conf=None):
argparser.add_argument('--ldap-user-gid', type=int, help='LDAP group ID for users', default=5001)
argparser.add_argument('--ldap-guest-gid', type=int, help='LDAP group ID for guests', default=5002)
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['OldSwarm'], default='OldSwarm')
......
......@@ -17,6 +17,7 @@
import datetime
import logging
import threading
log = logging.getLogger(__name__)
......@@ -66,6 +67,8 @@ class Execution:
self.priority = self.description['priority']
self.termination_lock = threading.Lock()
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
......@@ -122,6 +125,7 @@ class Execution:
self.error_message = message
self.sql_manager.execution_update(self.id, error_message=self.error_message)
@property
def is_active(self):
"""
Returns True if the execution is in the scheduler
......@@ -129,6 +133,7 @@ class Execution:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
@property
def is_running(self):
"""Returns True is the execution has at least the essential services running."""
return self._status == self.RUNNING_STATUS
......@@ -143,6 +148,16 @@ class Execution:
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
@property
def essential_services(self):
"""Getter for this execution essential service list."""
return self.sql_manager.service_list(execution_id=self.id, essential=True)
@property
def elastic_services(self):
"""Getter for this execution elastic service list."""
return self.sql_manager.service_list(execution_id=self.id, essential=False)
@property
def essential_services_running(self) -> bool:
"""Returns True if all essential services of this execution have started."""
......@@ -177,3 +192,6 @@ class Execution:
def total_reservations(self):
"""Return the union/sum of resources reserved by all services of this execution."""
return sum([s.resource_reservation for s in self.services])
def __repr__(self):
return str(self.id)
......@@ -64,6 +64,7 @@ class Service:
ACTIVE_STATUS = "active"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
RUNNABLE_STATUS = "runnable"
BACKEND_UNDEFINED_STATUS = 'undefined'
BACKEND_CREATE_STATUS = 'created'
......
......@@ -18,6 +18,7 @@
from typing import Dict
from zoe_lib.state import Execution, Service
from zoe_master.stats import ClusterStats
class BaseBackend:
......@@ -38,3 +39,7 @@ class BaseBackend:
def terminate_service(self, service: Service) -> None:
raise NotImplementedError
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
raise NotImplementedError
......@@ -16,17 +16,17 @@
"""The high-level interface that Zoe uses to talk to the configured container backend."""
import logging
from typing import List
from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
log = logging.getLogger(__name__)
_backend_initialized = False
def _get_backend() -> BaseBackend:
backend_name = get_conf().backend
......@@ -39,27 +39,21 @@ def _get_backend() -> BaseBackend:
def initialize_backend(state):
"""Initializes the configured backend."""
assert not _backend_initialized
backend = _get_backend()
backend.init(state)
def shutdown_backend():
"""Shuts down the configured backend."""
assert _backend_initialized
backend = _get_backend()
backend.shutdown()
def execution_to_containers(execution: Execution) -> None:
"""Translate an execution object into containers.
If an error occurs some containers may have been created and needs to be cleaned-up.
In case of error exceptions are raised.
"""
def service_list_to_containers(execution: Execution, service_list: List[Service]) -> str:
"""Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
backend = _get_backend()
ordered_service_list = sorted(execution.services, key=lambda x: x.startup_order)
ordered_service_list = sorted(service_list, key=lambda x: x.startup_order)
env_subst_dict = {
'execution_id': execution.id,
......@@ -74,7 +68,54 @@ def execution_to_containers(execution: Execution) -> None:
for service in ordered_service_list:
env_subst_dict['dns_name#self'] = service.dns_name
service.set_starting()
backend.spawn_service(execution, service, env_subst_dict)
try:
backend.spawn_service(execution, service, env_subst_dict)
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
execution.set_error_message(ex.message)
terminate_execution(execution)
execution.set_scheduled()
return "requeue"
except ZoeStartExecutionFatalException as ex:
log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
execution.set_error_message(ex.message)
terminate_execution(execution)
execution.set_error()
return "fatal"
except Exception as ex:
log.error('Fatal error trying to start service {} of execution {}'.format(service.id, execution.id))
log.exception('BUG, this error should have been caught earlier')
execution.set_error_message(str(ex))
terminate_execution(execution)
execution.set_error()
return "fatal"
else:
execution.set_running()
return "ok"
def start_all(execution: Execution) -> str:
"""Translate an execution object into containers.
If an error occurs some containers may have been created and needs to be cleaned-up.
"""
log.debug('starting all services for execution {}'.format(execution.id))
execution.set_starting()
return service_list_to_containers(execution, execution.services)
def start_essential(execution) -> str:
"""Start the essential services for this execution"""
log.debug('starting essential services for execution {}'.format(execution.id))
execution.set_starting()
return service_list_to_containers(execution, execution.essential_services)
def start_elastic(execution) -> str:
"""Start the runnable elastic services"""
elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
return service_list_to_containers(execution, elastic_to_start)
def terminate_execution(execution: Execution) -> None:
......@@ -89,3 +130,9 @@ def terminate_execution(execution: Execution) -> None:
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
execution.set_terminated()
def get_platform_state():
"""Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling."""
backend = _get_backend()
return backend.platform_state()
......@@ -27,6 +27,7 @@ from zoe_master.workspace.filesystem import ZoeFSWorkspace
import zoe_master.backends.common
import zoe_master.backends.base
from zoe_master.backends.old_swarm.threads import SwarmMonitor, SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats
log = logging.getLogger(__name__)
......@@ -124,3 +125,11 @@ class OldSwarmBackend(zoe_master.backends.base.BaseBackend):
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
self.swarm.terminate_container(service.backend_id, delete=True)
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
info = self.swarm.info()
for node in info.nodes: # type: NodeStats
node.memory_free = node.memory_total - node.memory_reserved
node.cores_free = node.cores_total - node.cores_reserved
return info
......@@ -54,7 +54,7 @@ def main():
state = SQLManager(args)
log.info("Initializing scheduler")
scheduler = getattr(zoe_master.scheduler, config.get_conf().scheduler_class)(state)
scheduler = getattr(zoe_master.scheduler, config.get_conf().scheduler_class)(state, config.get_conf().scheduler_policy)
zoe_master.backends.interface.initialize_backend(state)
......
......@@ -36,3 +36,8 @@ class ZoeStartExecutionRetryException(ZoeException):
class ZoeStartExecutionFatalException(ZoeException):
"""Execution emitted in case the Execution failed to start for a fatal error."""
pass
class UnsupportedSchedulerPolicyError(ZoeException):
"""The configuration asks for a combination of scheduler and policy that is unsupported."""
pass
......@@ -16,4 +16,5 @@
"""The Zoe schedulers"""
from .base_scheduler import ZoeBaseScheduler
from .scheduler import ZoeSimpleScheduler
from .simple_scheduler import ZoeSimpleScheduler
from .elastic_scheduler import ZoeElasticScheduler
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The Elastic scheduler is the implementation of the scheduling algorithm presented in this paper:
https://arxiv.org/abs/1611.09528
"""
from collections import namedtuple
import logging
import threading
import time
from zoe_lib.state import Execution, SQLManager
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
log = logging.getLogger(__name__)
ExecutionProgress = namedtuple('ExecutionProgress', ['last_time_scheduled', 'progress_sequence'])
class ZoeElasticScheduler:
"""The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
def __init__(self, state: SQLManager, policy):
if policy != 'FIFO' and policy != 'SIZE':
raise UnsupportedSchedulerPolicyError
self.trigger_semaphore = threading.Semaphore(0)
self.policy = policy
self.queue = []
self.additional_exec_state = {}
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
self.loop_th.start()
self.state = state
def trigger(self):
"""Trigger a scheduler run."""
self.trigger_semaphore.release()
def incoming(self, execution: Execution):
"""
This method adds the execution to the end of the FIFO queue and triggers the scheduler.
:param execution: The execution
:return:
"""
self.queue.append(execution)
exec_data = ExecutionProgress(0, [])
self.additional_exec_state[execution.id] = exec_data
self.trigger()
def terminate(self, execution: Execution) -> None:
"""
Inform the master that an execution has been terminated. This can be done asynchronously.
:param execution: the terminated execution
:return: None
"""
def async_termination(e):
"""Actual termination runs in a thread."""
with e.termination_lock:
terminate_execution(e)
self.trigger()
log.debug('Execution {} terminated successfully'.format(e.id))
try:
self.queue.remove(execution)
except ValueError:
log.error('Terminating execution {} that is not in scheduler queue'.format(execution.id))
try:
del self.additional_exec_state[execution.id]
except KeyError:
pass
th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id), args=(execution,))
th.start()
self.async_threads.append(th)
def _cleanup_async_threads(self):
counter = len(self.async_threads)
while counter > 0:
if len(self.async_threads) == 0:
break
th = self.async_threads.pop(0)
th.join(0.1)
if th.isAlive(): # join failed
log.debug('Thread {} join failed'.format(th.name))
self.async_threads.append(th)
counter -= 1
def _refresh_execution_sizes(self):
for execution in self.queue: # type: Execution
exec_data = self.additional_exec_state[execution.id]
if exec_data.last_time_scheduled == 0:
progress = 0
else:
last_progress = (time.time() - exec_data.last_time_scheduled) / ((execution.services_count / execution.running_services_count) * execution.priority)
exec_data.progress_sequence.append(last_progress)
progress = sum(exec_data.progress_sequence)
remaining_execution_time = (1 - progress) * execution.priority
execution.size = remaining_execution_time * execution.services_count
def _pop_all_with_same_size(self):
out_list = []
while len(self.queue) > 0:
job = self.queue.pop(0) # type: Execution
ret = job.termination_lock.acquire(blocking=False)
if ret and job.status != Execution.TERMINATED_STATUS:
out_list.append(job)
else:
log.debug('While popping, throwing away execution {} that has the termination lock held'.format(job.id))
return out_list
def loop_start_th(self):
"""The Scheduler thread loop."""
auto_trigger_base = 60 # seconds
auto_trigger = auto_trigger_base
while True:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
self._cleanup_async_threads()
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
log.debug("Scheduler loop has been triggered")
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
if self.policy == "SIZE":
self.queue.sort()
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
platform_state = get_platform_state()
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
# Try to find a placement solution using a snapshot of the platform status
for job in jobs_to_attempt_scheduling: # type: Execution
jobs_to_launch_copy = jobs_to_launch.copy()
# remove all elastic services from the previous simulation loop
for job_aux in jobs_to_launch: # type: Execution
cluster_status_snapshot.deallocate_elastic(job_aux)
job_can_start = False
if not job.is_running:
job_can_start = cluster_status_snapshot.allocate_essential(job)
if job_can_start or job.is_running:
jobs_to_launch.append(job)
# Try to put back the elastic services
for job_aux in jobs_to_launch:
cluster_status_snapshot.allocate_elastic(job_aux)
current_free_resources = cluster_status_snapshot.aggregated_free_memory()
if current_free_resources >= free_resources:
# job_aux = jobs_to_launch.pop()
# cluster_status_snapshot.deallocate_essential(job_aux)
# cluster_status_snapshot.deallocate_elastic(job_aux)
# for job_aux in jobs_to_launch:
# cluster_status_snapshot.allocate_elastic(job_aux)
jobs_to_launch = jobs_to_launch_copy
break
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
continue
assert ret == "ok"
start_elastic(job)
if job.all_services_running:
log.debug('execution {}: all services started'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
self.queue = jobs_to_attempt_scheduling + self.queue
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
break
if len(jobs_to_launch) == 0:
log.debug('No executions could be started, exiting inner loop')
break
def quit(self):
"""Stop the scheduler thread."""
self.loop_quit = True
self.trigger()
self.loop_th.join()
def stats(self):
"""Scheduler statistics."""
return {
'queue_length': len(self.queue),
'termination_threads_count': len(self.async_threads)
}
......@@ -19,17 +19,19 @@ import logging
import threading
from zoe_lib.state import Execution
from zoe_master.backends.interface import execution_to_containers, terminate_execution
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
from zoe_master.backends.interface import start_all, terminate_execution
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
log = logging.getLogger(__name__)
class ZoeSimpleScheduler(ZoeBaseScheduler):
"""The Scheduler class."""
def __init__(self, state):
def __init__(self, state, policy):
super().__init__(state)
if policy != 'FIFO':
raise UnsupportedSchedulerPolicyError
self.fifo_queue = []
self.trigger_semaphore = threading.Semaphore(0)
self.async_threads = []
......@@ -103,26 +105,9 @@ class ZoeSimpleScheduler(ZoeBaseScheduler):
e.set_starting()
self.fifo_queue.pop(0) # remove the execution form the queue
try:
execution_to_containers(e)
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting execution {}: {}'.format(e.id, ex.message))
e.set_error_message(ex.message)
terminate_execution(e)
e.set_scheduled()
ret = start_all(e)
if ret == 'requeue':
self.fifo_queue.append(e)
except ZoeStartExecutionFatalException as ex:
log.error('Fatal error trying to start execution {}: {}'.format(e.id, ex.message))
e.set_error_message(ex.message)
terminate_execution(e)
e.set_error()
except Exception as ex:
log.exception('BUG, this error should have been caught earlier')
e.set_error_message(str(ex))
terminate_execution(e)
e.set_error()
else:
e.set_running()
def quit(self):
"""Stop the scheduler thread."""
......
"""Classes to hold the system state and simulated container/service placements"""
from zoe_lib.state.sql_manager import Execution, Service
from zoe_master.stats import ClusterStats, NodeStats
class SimulatedNode:
"""A simulated node where containers can be run"""
def __init__(self, real_node: NodeStats):
self.real_reservations = {
"memory": real_node.memory_reserved
}
self.real_free_resources = {
"memory": real_node.memory_free
}
self.real_active_containers = real_node.container_count
self.services = []
self.name = real_node.name
def service_fits(self, service: Service) -> bool:
"""Checks whether a service can fit in this node"""
if service.resource_reservation.memory < self.node_free_memory():
return True
else:
return False
def service_add(self, service):
"""Add a service in this node."""
if self.service_fits(service):
self.services.append(service)
return True
else:
return False
def service_remove(self, service):
"""Add a service in this node."""
try:
self.services.remove(service)
except ValueError:
return False
else:
return True
@property
def container_count(self):
"""Return the number of containers on this node"""
return self.real_active_containers + len(self.services)
def node_free_memory(self):
"""Return the amount of free memory for this node"""
simulated_reservation = 0
for service in self.services: # type: Service
simulated_reservation += service.resource_reservation.memory
assert (self.real_free_resources['memory'] - simulated_reservation) >= 0
return self.real_free_resources['memory'] - simulated_reservation
def __repr__(self):
# services = ','.join([str(s.id) for s in self.services])
s = 'SN {} | f {}'.format(self.name, self.node_free_memory())
return s
class SimulatedPlatform:
"""A simulated cluster, composed by simulated nodes"""
def __init__(self, plastform_status: ClusterStats):
self.nodes = {}
for node in plastform_status.nodes:
self.nodes[node.name] = SimulatedNode(node)
def allocate_essential(self, execution: Execution) -> bool:
"""Try to find an allocation for essential services"""
for service in execution.essential_services:
candidate_nodes = []
for node_name, node in self.nodes.items():
if node.service_fits(service):
candidate_nodes.append(node)
if len(candidate_nodes) == 0: # this service does not fit anywhere
self.deallocate_essential(execution)
return False
candidate_nodes.sort(key=lambda n: n.container_count) # smallest first
candidate_nodes[0].service_add(service)
return True
def deallocate_essential(self, execution: Execution):
"""Remove all essential services from the simulated cluster"""
for service in execution.essential_services:
for node_name, node in self.nodes.items():
if node.service_remove(service):
break
def allocate_elastic(self, execution: Execution) -> bool:
"""Try to find an allocation for elastic services"""
at_least_one_allocated = False
for service in execution.elastic_services:
if service.status == service.ACTIVE_STATUS:
continue
candidate_nodes = []
for node_name, node in self.nodes.items():
if node.service_fits(service):
candidate_nodes.append(node)
if len(candidate_nodes) == 0: # this service does not fit anywhere
continue
candidate_nodes.sort(key=lambda n: n.container_count) # smallest first
candidate_nodes[0].service_add(service)
service.set_runnable()
at_least_one_allocated = True
return at_least_one_allocated
def deallocate_elastic(self, execution: Execution):
"""Remove all elastic services from the simulated cluster"""
for service in execution.elastic_services:
for node_name, node in self.nodes.items():
if node.service_remove(service):
service.set_inactive()
break
def aggregated_free_memory(self):
"""Return the amount of free memory across all nodes"""
total = 0
for n_id, n in self.nodes.items():
total += n.node_free_memory()
return total
def get_service_allocation(self):
"""Return a map of service IDs to nodes where they have been allocated."""
placements = {}
for node_id, node in self.nodes.items():
for service in node.services:
placements[service.id] = node_id
return placements
def __repr__(self):
s = ''
for node_name, node in self.nodes.items():
s += str(node) + " # "
return s
......@@ -35,8 +35,10 @@ class NodeStats(Stats):
self.container_count = 0
self.cores_total = 0
self.cores_reserved = 0
self.cores_free = 0
self.memory_total = 0
self.memory_reserved = 0
self.memory_free = 0
self.labels = {}
self.status = None
self.error = ''
......
<