Commit 34da7121 authored by Daniele Venzano's avatar Daniele Venzano

Split statistics retrieval, cache stats in the right place and improve performance

parent 5401eeeb
......@@ -17,6 +17,7 @@
import logging
import os
from time import time
import zoe_api.exceptions
import zoe_api.master_api
......@@ -40,6 +41,8 @@ class APIEndpoint:
def __init__(self, master_api, sql_manager):
self.master = master_api
self.sql = sql_manager
self.cached_statistics = None
self.cached_statistics_time = time()
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
"""Lookup an execution by its ID."""
......@@ -157,9 +160,12 @@ class APIEndpoint:
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
success, message = self.master.scheduler_statistics()
if success:
return message
if self.cached_statistics is None or time() - self.cached_statistics_time > 60:
success, message = self.master.scheduler_statistics()
if success:
self.cached_statistics_time = time()
self.cached_statistics = message
return self.cached_statistics
def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services."""
......
......@@ -281,3 +281,8 @@ class Service:
def unique_name(self):
"""Returns a name for this service that is unique across multiple Zoe instances running on the same backend."""
return self.name + '-' + str(self.execution_id) + '-' + get_conf().deployment_name
@property
def execution(self):
"""Return the parent execution."""
return self.sql_manager.execution_list(only_one=True, id=self.execution_id)
......@@ -15,6 +15,8 @@
"""The base class that all backends should implement."""
from typing import List
from zoe_lib.state import Service
from zoe_master.stats import ClusterStats
from zoe_master.backends.service_instance import ServiceInstance
......@@ -50,7 +52,7 @@ class BaseBackend:
"""Terminate the container corresponding to a service."""
raise NotImplementedError
def platform_state(self, state=None) -> ClusterStats:
def platform_state(self, usage_stats=False) -> ClusterStats:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
......@@ -61,3 +63,11 @@ class BaseBackend:
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
raise NotImplementedError
def node_list(self) -> List[str]:
"""List node names configured in the back-end."""
raise NotImplementedError
def list_available_images(self, node_name):
"""List the images available on the specified node."""
raise NotImplementedError
......@@ -16,7 +16,7 @@
"""Interface to the low-level Docker API."""
import logging
from typing import Iterable, Callable, Dict, Any
from typing import List, Callable, Dict, Any
import docker
import docker.tls
......@@ -110,7 +110,7 @@ class DockerClient:
run_args['mem_reservation'] -= 1
if service_instance.core_limit is not None:
run_args['cpu_quota'] = int(100000 * service_instance.core_limit.max)
run_args['cpu_quota'] = int(100000 * service_instance.core_limit.min)
if get_conf().gelf_address != '':
run_args['log_config'] = {
......@@ -237,7 +237,7 @@ class DockerClient:
if not res:
break
def list(self, only_label=None) -> Iterable[dict]:
def list(self, only_label=None) -> List[dict]:
"""
List running or defined containers.
......
......@@ -17,19 +17,19 @@
import logging
import re
import time
import threading
import time
from zoe_lib.state import Service
from zoe_lib.config import get_conf
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_lib.config import get_conf
from zoe_lib.state import Service
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
from zoe_master.metrics.kairosdb import KairosDBInMetrics
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.metrics.incoming.kairosdb import KairosDBInMetrics
log = logging.getLogger(__name__)
......@@ -86,7 +86,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def platform_state(self, state=None) -> ClusterStats:
def platform_state(self, usage_stats=False) -> ClusterStats:
"""Get the platform state."""
time_start = time.time()
platform_stats = ClusterStats()
......@@ -94,7 +94,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
for host_conf in self.docker_config: # type: DockerHostConfig
node_stats = NodeStats(host_conf.name)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats, state), name='stats_host_{}'.format(host_conf.name), daemon=True)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats, usage_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th.start()
th_list.append((th, node_stats))
......@@ -105,7 +105,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.debug('Time for platform stats: {:.2f}s'.format(time.time() - time_start))
return platform_stats
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats, state):
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats, get_usage_stats: bool):
node_stats.labels = host_conf.labels
try:
my_engine = DockerClient(host_conf)
......@@ -118,12 +118,12 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
node_stats.status = 'online'
try:
container_list = my_engine.list()
container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
info = my_engine.info()
except ZoeException:
return
node_stats.container_count = info['Containers']
node_stats.container_count = len(container_list)
node_stats.cores_total = info['NCPU']
node_stats.memory_total = info['MemTotal']
if info['Labels'] is not None:
......@@ -133,42 +133,46 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
node_stats.cores_reserved = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])
stats = {}
if get_conf().kairosdb_enable:
kdb = KairosDBInMetrics()
for cont in container_list:
stats[cont['id']] = kdb.get_service_usage(cont['name'])
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
for cont in container_list:
stats[cont['id']] = {}
stats[cont['id']]['core_limit'] = cont['cpu_quota'] / cont['cpu_period']
stats[cont['id']]['mem_limit'] = cont['memory_soft_limit']
if get_usage_stats:
if get_conf().kairosdb_enable:
kdb = KairosDBInMetrics()
for cont in container_list:
stats[cont['id']].update(kdb.get_service_usage(cont['name']))
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
else:
for cont in container_list:
try:
aux = my_engine.stats(cont['id'], stream=False) # this call is very slow (>~1sec)
if 'usage' in aux['memory_stats']:
stats[cont['id']]['mem_usage'] = aux['memory_stats']['usage']
else:
stats[cont['id']]['mem_usage'] = 0
stats[cont['id']]['cpu_usage'] = self._get_core_usage(aux)
except ZoeException:
continue
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
else:
for cont in container_list:
try:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
except ZoeException:
continue
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
if get_conf().backend_image_management:
node_stats.image_list = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
node_stats.image_list.append(image)
node_stats.memory_in_use = 0
node_stats.cores_in_use = 0
def _get_core_usage(self, stat):
cpu_time_now = stat['cpu_stats']['cpu_usage']['total_usage']
cpu_time_pre = stat['precpu_stats']['cpu_usage']['total_usage']
return (cpu_time_now - cpu_time_pre) / 1000000000
def node_list(self):
"""Return a list of node names."""
return [node.name for node in self.docker_config]
def service_log(self, service: Service):
"""Get the log."""
conf = self._get_config(service.backend_host)
......@@ -196,6 +200,36 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
if not one_success:
raise ZoeException('Cannot pull image {}'.format(image_name))
def list_available_images(self, node_name):
"""List the images available on the specified node."""
if not get_conf().backend_image_management:
return []
host_conf = None
for conf in self.docker_config:
if conf.name == node_name:
host_conf = conf
break
if host_conf is None:
log.error('Unknown node {}, returning empty image list'.format(node_name))
return []
my_engine = DockerClient(host_conf)
image_list = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
image_list.append(image)
return image_list
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
conf = self._get_config(service.backend_host)
......@@ -206,7 +240,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
cores = info['NCPU']
if memory is not None and memory > info['MemTotal']:
memory = info['MemTotal']
cpu_quota = cores * 1000000
cpu_quota = int(cores * 100000)
engine.update(service.backend_id, cpu_quota=cpu_quota, mem_reservation=memory)
else:
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
log.error('Cannot update service {}, since it has no backend ID'.format(service.name))
......@@ -153,48 +153,43 @@ def start_elastic(execution: Execution, placement) -> str:
return service_list_to_containers(execution, elastic_to_start, placement)
def terminate_execution(execution: Execution) -> None:
"""Terminate an execution."""
def terminate_service(service: Service) -> None:
"""Terminate a single service."""
backend = _get_backend()
for service in execution.services: # type: Service
if service.status != Service.INACTIVE_STATUS:
if service.status == Service.ERROR_STATUS:
continue
elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
elif not service.is_dead():
log.warning('Service {} is inactive for Zoe, but running for the back-end, terminating and resetting state'.format(service.name))
if service.status != Service.INACTIVE_STATUS:
if service.status == Service.ERROR_STATUS:
return
elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
execution.set_terminated()
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
elif not service.is_dead():
log.warning('Service {} is inactive for Zoe, but running for the back-end, terminating and resetting state'.format(service.name))
service.set_terminating()
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
CACHED_STATS = None # type: ClusterStats
def terminate_execution(execution: Execution) -> None:
"""Terminate an execution."""
for service in execution.services: # type: Service
terminate_service(service)
execution.set_terminated()
def get_platform_state(state: SQLManager, force_update=False) -> ClusterStats:
def get_platform_state(state: SQLManager, with_usage_stats=False) -> ClusterStats:
"""Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling.
Since retrieving statistics can be slow, the we cache results and use the force_update parameter to know when fresh (and slow) values are absolutely needed."""
global CACHED_STATS
if CACHED_STATS is not None and not force_update:
return CACHED_STATS
Since retrieving usage statistics is slow, you need to ask for them explicitly."""
backend = _get_backend()
platform_state = backend.platform_state(state)
platform_state = backend.platform_state(with_usage_stats)
for node in platform_state.nodes: # type: NodeStats
node.services = state.service_list(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
CACHED_STATS = platform_state
return platform_state
......@@ -208,3 +203,25 @@ def preload_image(image_name):
log.info('Image {} preloaded in {:.2f}s'.format(image_name, time.time() - time_start))
except NotImplementedError:
log.warning('Backend {} does not support image preloading'.format(get_conf().backend))
def update_service_resource_limits(service, cores=None, memory=None):
"""Update a service reservation."""
backend = _get_backend()
if cores is not None:
log.debug('Setting core limit to {} for service {}'.format(cores, service.id))
if memory is not None:
log.debug('Setting memory limit to {} for service {}'.format(memory, service.id))
backend.update_service(service, cores, memory)
def node_list():
"""List node names configured in the back-end."""
backend = _get_backend()
return backend.node_list()
def list_available_images(node_name):
"""List the images available on the specified node."""
backend = _get_backend()
return backend.list_available_images(node_name)
......@@ -67,7 +67,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
"""Terminate and delete a container."""
self.kube.terminate(service.dns_name)
def platform_state(self, state=None) -> ClusterStats:
def platform_state(self, usage_stats=False) -> ClusterStats:
"""Get the platform state."""
info = self.kube.info()
for node in info.nodes: # type: NodeStats
......
......@@ -70,7 +70,7 @@ class SwarmBackend(zoe_master.backends.base.BaseBackend):
"""Terminate and delete a container."""
self.swarm.terminate_container(service.backend_id, delete=True)
def platform_state(self, state=None) -> ClusterStats:
def platform_state(self, usage_stats=False) -> ClusterStats:
"""Get the platform state."""
info = self.swarm.info()
for node in info.nodes: # type: NodeStats
......
......@@ -21,7 +21,6 @@ import threading
import queue
from zoe_lib.config import get_conf
from zoe_master.backends.interface import get_platform_state
log = logging.getLogger(__name__)
......@@ -73,7 +72,6 @@ class BaseMetricSender:
stop = False
while not stop:
time_start = time.time()
get_platform_state(self.state, force_update=True)
while not self._queue.empty():
data = self._queue.get(block=False)
if data == 'quit':
......
......@@ -23,12 +23,13 @@ import logging
import threading
import time
from zoe_lib.state import Execution, SQLManager
from zoe_lib.state import Execution, SQLManager, Service
from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential, update_service_resource_limits
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
from zoe_master.stats import NodeStats
log = logging.getLogger(__name__)
......@@ -88,6 +89,7 @@ class ZoeElasticScheduler:
log.error('Error in termination thread: {}'.format(ex))
return
self.trigger()
self._adjust_core_limits()
log.debug('Execution {} terminated successfully'.format(e.id))
try:
......@@ -190,7 +192,7 @@ class ZoeElasticScheduler:
log.debug("-> {}".format(job))
try:
platform_state = get_platform_state(self.state, force_update=True)
platform_state = get_platform_state(self.state)
except ZoeException:
log.error('Cannot retrieve platform state, cannot schedule')
for job in jobs_to_attempt_scheduling:
......@@ -244,6 +246,7 @@ class ZoeElasticScheduler:
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job, placements)
......@@ -254,6 +257,8 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
self._adjust_core_limits()
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
......@@ -286,5 +291,23 @@ class ZoeElasticScheduler:
'termination_threads_count': len(self.async_threads),
'queue': [s.id for s in queue],
'running_queue': [s.id for s in self.queue_running],
'platform_stats': get_platform_state(self.state).serialize()
'platform_stats': get_platform_state(self.state, with_usage_stats=True).serialize()
}
def _adjust_core_limits(self):
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
......@@ -5,6 +5,8 @@ import logging
from zoe_lib.state.sql_manager import Execution, Service
from zoe_lib.config import get_conf
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.interface import list_available_images
log = logging.getLogger(__name__)
......@@ -12,19 +14,20 @@ log = logging.getLogger(__name__)
class SimulatedNode:
"""A simulated node where containers can be run"""
def __init__(self, real_node: NodeStats):
min_cores_reserved = sum([s.resource_reservation.cores.min for s in real_node.services if s.resource_reservation.cores is not None])
self.real_reservations = {
"memory": real_node.memory_reserved,
"cores": real_node.cores_reserved
"cores": min_cores_reserved
}
self.real_free_resources = {
"memory": real_node.memory_total - real_node.memory_reserved,
"cores": real_node.cores_total - real_node.cores_reserved
"cores": real_node.cores_total - min_cores_reserved
}
self.real_active_containers = real_node.container_count
self.services = []
self.name = real_node.name
self.labels = real_node.labels
self.images = real_node.image_list
self.images = list_available_images(self.name)
def service_fits(self, service: Service) -> bool:
"""Checks whether a service can fit in this node"""
......
......@@ -27,6 +27,27 @@ class Stats:
self.timestamp = time.time()
class ServiceStats(Stats):
"""Stats related to a service."""
def __init__(self, service):
super().__init__()
self.service = service
self.cores_in_use = 0
self.cores_limit = 0
self.memory_in_use = 0
self.memory_limit = 0
def serialize(self):
"""Convert the object into a dict."""
return {
'service': self.service.serialize(),
'cores_in_use': self.cores_in_use,
'cores_limit': self.cores_limit,
'memory_in_use': self.memory_in_use,
'memory_limit': self.memory_limit
}
class NodeStats(Stats):
"""Stats related to a single node."""
def __init__(self, name):
......@@ -43,7 +64,6 @@ class NodeStats(Stats):
self.status = None
self.error = ''
self.services = []
self.image_list = []
def serialize(self):
"""Convert the object into a dict."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment