...
 
Commits (14)
......@@ -281,3 +281,8 @@ class Service:
def unique_name(self):
"""Returns a name for this service that is unique across multiple Zoe instances running on the same backend."""
return self.name + '-' + str(self.execution_id) + '-' + get_conf().deployment_name
@property
def execution(self):
"""Return the parent execution."""
return self.sql_manager.execution_list(only_one=True, id=self.execution_id)
......@@ -137,18 +137,20 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
kdb = KairosDBInMetrics()
for cont in container_list:
stats[cont['id']] = kdb.get_service_usage(cont['name'])
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
stats[cont['id']]['mem_limit'] = cont['memory_soft_limit']
else:
for cont in container_list:
try:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
except ZoeException:
aux = my_engine.stats(cont['id'], stream=False)
stats[cont['id']] = {}
stats[cont['id']]['mem_usage'] = aux['memory_stats']['usage']
stats[cont['id']]['cpu_usage'] = self._get_core_usage(aux)
stats[cont['id']]['mem_limit'] = cont['memory_soft_limit']
except (ZoeException, KeyError):
continue
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
node_stats.cont_stats = stats
if get_conf().backend_image_management:
node_stats.image_list = []
......@@ -206,7 +208,10 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
cores = info['NCPU']
if memory is not None and memory > info['MemTotal']:
memory = info['MemTotal']
cpu_quota = cores * 1000000
if cores is not None:
cpu_quota = cores * 1000000
else:
cpu_quota = None
engine.update(service.backend_id, cpu_quota=cpu_quota, mem_reservation=memory)
else:
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
......@@ -136,46 +136,67 @@ def start_all(execution: Execution) -> str:
"""
log.debug('starting all services for execution {}'.format(execution.id))
execution.set_starting()
return service_list_to_containers(execution, execution.services)
try:
ret = service_list_to_containers(execution, execution.services)
except BaseException:
log.exception('Unexpected exception while starting execution {}'.format(execution.id))
return 'requeue'
else:
return ret
def start_essential(execution: Execution, placement) -> str:
"""Start the essential services for this execution"""
log.debug('starting essential services for execution {}'.format(execution.id))
execution.set_starting()
return service_list_to_containers(execution, execution.essential_services, placement)
try:
ret = service_list_to_containers(execution, execution.essential_services, placement)
except BaseException:
log.exception('Unexpected exception while starting execution {}'.format(execution.id))
return 'requeue'
else:
return ret
def start_elastic(execution: Execution, placement) -> str:
"""Start the runnable elastic services"""
elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
return service_list_to_containers(execution, elastic_to_start, placement)
try:
ret = service_list_to_containers(execution, elastic_to_start, placement)
except BaseException:
log.exception('Unexpected exception while starting execution {}'.format(execution.id))
return 'requeue'
else:
return ret
def terminate_execution(execution: Execution) -> None:
"""Terminate an execution."""
def terminate_service(service: Service) -> None:
"""Terminate a single service."""
backend = _get_backend()
for service in execution.services: # type: Service
if service.status != Service.INACTIVE_STATUS:
if service.status == Service.ERROR_STATUS:
continue
elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
elif not service.is_dead():
log.warning('Service {} is inactive for Zoe, but running for the back-end, terminating and resetting state'.format(service.name))
if service.status != Service.INACTIVE_STATUS:
if service.status == Service.ERROR_STATUS:
return
elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
elif not service.is_dead():
log.warning('Service {} is inactive for Zoe, but running for the back-end, terminating and resetting state'.format(service.name))
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
def terminate_execution(execution: Execution) -> None:
"""Terminate an execution."""
for service in execution.services: # type: Service
terminate_service(service)
execution.set_terminated()
......@@ -208,3 +229,10 @@ def preload_image(image_name):
log.info('Image {} preloaded in {:.2f}s'.format(image_name, time.time() - time_start))
except NotImplementedError:
log.warning('Backend {} does not support image preloading'.format(get_conf().backend))
def update_service_resource_limits(service, cores=None, memory=None):
"""Update a service reservation."""
backend = _get_backend()
log.debug('Updating resources for service {}'.format(service.id))
backend.update_service(service, cores, memory)
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dynamic allocation experimental code
"""
import logging
import GPy
import numpy as np
from zoe_lib.state import Execution
from zoe_master.backends.interface import get_platform_state, update_service_resource_limits, terminate_execution, terminate_service
log = logging.getLogger(__name__)
MEMORY_HISTORY_POINT_COUNT_MAX = 400
PREDICTION_MIN_POINTS = 10
BUFFER_SIZE = 0.20
class DynamicReallocator:
"""The Dynamic reallocator with the predictor."""
def __init__(self, scheduler):
self.memory_history = {}
self.state = scheduler.state
self.scheduler = scheduler
def do_dynamic_step(self):
"""Drive the entire dynamic allocation algorithm."""
running_components = self.state.service_list(backend_status="started")
platform_stats = get_platform_state(self.state, force_update=True)
for rc in running_components:
if rc.id in self.memory_history:
self.memory_history[rc.id].append(self._get_component_memory_usage(rc, platform_stats))
else:
self.memory_history[rc.id] = [self._get_component_memory_usage(rc, platform_stats)]
predictions = {}
for rc in running_components:
while len(self.memory_history[rc.id]) > MEMORY_HISTORY_POINT_COUNT_MAX:
self.memory_history[rc.id].pop(0)
if len(self.memory_history[rc.id]) < PREDICTION_MIN_POINTS:
predictions[rc.id] = self._get_component_memory_limit(rc, platform_stats)
continue
predicted_allocation, variance = self.gp_predict(self.memory_history[rc.id], restarts=5)
# Next we add the buffer to compensate for the prediction error
# For now we use a static value and we do not change it depending on the variance
predictions[rc.id] = predicted_allocation * (1 + BUFFER_SIZE) # + normalized_variance)
# Here starts Algorithm 1 from the paper
# Before performing the simulation we have to sort the components per host.
# The data structure is as follows:
# hosts = [
# {
# 'execution_id': {
# 'core' : [list of running core components on the host bf_X],
# 'elastic' : [list of running elastic components on the host bf_X]
# }
# },
# [...]
# ]
# In addition the list of running components must be sorted in ascending order by starting time
# We use the ID for sorting since we do not have the starting time for each service/component
hosts = []
executions_by_id = {}
for node in platform_stats.nodes:
node_executions = sorted([s.execution for s in node.services], key=lambda x: x.size)
host = {}
for execution in node_executions: # type: Execution
host[execution.id] = {
'core': sorted([s for s in execution.essential_services if s in node.services], key=lambda s: s.id),
'elastic': sorted([s for s in execution.elastic_services if s in node.services], key=lambda s: s.id)
}
executions_by_id[execution.id] = execution
hosts.append(host)
for host in hosts:
executions_to_kill = []
components_to_kill = []
components_to_resize = []
mem_free = platform_stats.memory_total - sum([node.memory_reserved for node in platform_stats.nodes])
for execution_id in host.keys():
execution = host[execution_id]
tmp_mem_free = mem_free
tmp_components = []
for core in execution['core']:
tmp_mem_free -= predictions[core.id]
tmp_components.append(core)
if tmp_mem_free < 0:
executions_to_kill.append(execution_id)
else:
components_to_resize.extend(tmp_components)
mem_free = tmp_mem_free
for elastic in execution['elastic']:
tmp_mem_free = mem_free - predictions[elastic.id]
if tmp_mem_free < 0:
components_to_kill.append(elastic)
else:
components_to_resize.append(elastic)
mem_free = tmp_mem_free
for execution_id in executions_to_kill:
execution = executions_by_id[execution_id]
terminate_execution(execution)
try:
self.scheduler.queue.remove(execution)
except ValueError:
try:
self.scheduler.queue_running.remove(execution)
except ValueError:
log.error('Terminating execution {} that is not in any queue'.format(execution.id))
self.scheduler.incoming(execution)
for component in components_to_kill:
terminate_service(component)
if component.execution not in self.scheduler.queue:
assert component.execution in self.scheduler.queue_running
self.scheduler.queue_running.remove(component.execution)
self.scheduler.queue.append(component.execution)
for component in components_to_resize:
update_service_resource_limits(component, memory=predictions[component.id])
def _get_component_memory_usage(self, component, platform_stats):
for node in platform_stats.nodes:
for cont_id, cont_stat in node.cont_stats.items():
if cont_id == component.backend_id:
return cont_stat['mem_usage']
def _get_component_memory_limit(self, component, platform_stats):
for node in platform_stats.nodes:
for cont_id, cont_stat in node.cont_stats.items():
if cont_id == component.backend_id:
return cont_stat['mem_limit']
def _generate_kernel(self, kernel_name, input_dim):
if kernel_name == "RBF":
return GPy.kern.RBF(input_dim=input_dim)
elif kernel_name == "Matern52":
return GPy.kern.Matern52(input_dim=input_dim)
elif kernel_name == "Bias":
return GPy.kern.Bias(input_dim=input_dim)
elif kernel_name == "Exponential":
return GPy.kern.Exponential(input_dim=input_dim)
elif kernel_name == "Poly-5":
return GPy.kern.Poly(input_dim=input_dim, order=5)
elif kernel_name == "Poly-7":
return GPy.kern.Poly(input_dim=input_dim, order=7)
else:
print("## ERROR: Kernel " + kernel_name + " not recognized.")
return None
def _pad_after(self, l, size, padding):
return l + [padding] * abs((len(l) - size))
def _pad_before(self, l, size, padding):
return [padding] * abs((len(l) - size)) + l
def gp_predict(self, timeseries, kernel_name="Exponential", hist_window_size=10, pred_window_size=1, restarts=0):
"""The predictor."""
kernel = self._generate_kernel(kernel_name, hist_window_size + 1)
N = len(timeseries)
X = np.zeros((N, hist_window_size + 1))
Y = np.array([[v] for v in timeseries])
for i in range(N):
X[i, 0] = i
window_start = max(i - hist_window_size, 0)
hist_window = timeseries[window_start:i]
hist_window = self._pad_before(hist_window, hist_window_size, 0)
X[i, 1:] = np.array([[v] for v in hist_window]).T
# print(X)
# NOTE: the most recent history goes at the last columns of X
input_mean = np.mean(X, axis=0)
input_stdv = np.std(X, axis=0)
training_mean = np.mean(Y)
training_stdv = np.std(Y)
X = (X - input_mean) / input_stdv
Y = (Y - training_mean) / training_stdv
m = GPy.models.GPRegression(X, Y, kernel, mean_function=None, normalizer=False)
m.Gaussian_noise.constrain_fixed()
m.Gaussian_noise = 0.0001
if restarts == 0:
m.optimize()
else:
m.optimize_restarts(num_restarts=restarts, verbose=False)
# print(m)
Xtest = np.array([[v] for v in timeseries[i - hist_window_size:i]]).T
Xtest = np.concatenate((np.array([[i]]), Xtest), axis=1)
Xtest = np.zeros((pred_window_size, hist_window_size + 1))
for i in range(N, N + pred_window_size):
Xtest[i - N, 0] = i
window_start = max(i - hist_window_size, 0)
hist_window = timeseries[window_start:i]
hist_window = self._pad_after(hist_window, hist_window_size, 0)
Xtest[i - N, 1:] = np.array([[v] for v in hist_window]).T
# print(Xtest)
# NOTE: the most recent history goes at the last columns of Xtest
(pred, var) = m.predict((Xtest - input_mean) / input_stdv)
pred = pred * training_stdv + training_mean
var = var * np.power(training_stdv, 2)
return pred, var
......@@ -25,7 +25,7 @@ import time
from zoe_lib.state import Execution, SQLManager
from zoe_master.exceptions import ZoeException
from zoe_master.scheduler.dynamic_allocation import DynamicReallocator
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
......@@ -33,7 +33,7 @@ from zoe_master.exceptions import UnsupportedSchedulerPolicyError
log = logging.getLogger(__name__)
ExecutionProgress = namedtuple('ExecutionProgress', ['last_time_scheduled', 'progress_sequence'])
SELF_TRIGGER_TIMEOUT = 60 # the scheduler will trigger itself periodically in case platform resources have changed outside its control
SELF_TRIGGER_TIMEOUT = 30 # the scheduler will trigger itself periodically in case platform resources have changed outside its control
class ZoeElasticScheduler:
......@@ -49,7 +49,6 @@ class ZoeElasticScheduler:
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self._thread_wrapper, name='scheduler')
self.loop_th.start()
self.state = state
for execution in self.state.execution_list(status='running'):
if execution.all_services_running:
......@@ -58,6 +57,9 @@ class ZoeElasticScheduler:
self.queue.append(execution)
self.additional_exec_state[execution.id] = ExecutionProgress(0, [])
self.dynamic_reallocator = DynamicReallocator(self)
self.loop_th.start()
def trigger(self):
"""Trigger a scheduler run."""
self.trigger_semaphore.release()
......@@ -168,6 +170,8 @@ class ZoeElasticScheduler:
if self.loop_quit:
break
self.dynamic_reallocator.do_dynamic_step()
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
continue
......
......@@ -44,6 +44,7 @@ class NodeStats(Stats):
self.error = ''
self.services = []
self.image_list = []
self.cont_stats = {}
def serialize(self):
"""Convert the object into a dict."""
......