Commit 87694fc8 authored by Daniele Venzano's avatar Daniele Venzano

Start generalization of stats management

parent 83f0552a
......@@ -50,7 +50,7 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=line-too-long,invalid-sequence-index,print-statement,parameter-unpacking,unpacking-in-except,old-raise-syntax,backtick,long-suffix,old-ne-operator,old-octal-literal,import-star-module-level,raw-checker-failed,bad-inline-option,locally-disabled,locally-enabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,no-self-use,too-many-instance-attributes,too-few-public-methods,too-many-branches,too-many-statements,unnecessary-lambda,arguments-differ,fixme,global-statement,logging-format-interpolation,apply-builtin,basestring-builtin,buffer-builtin,cmp-builtin,coerce-builtin,execfile-builtin,file-builtin,long-builtin,raw_input-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,no-absolute-import,old-division,dict-iter-method,dict-view-method,next-method-called,metaclass-assignment,indexing-exception,raising-string,reload-builtin,oct-method,hex-method,nonzero-method,cmp-method,input-builtin,round-builtin,intern-builtin,unichr-builtin,map-builtin-not-iterating,zip-builtin-not-iterating,range-builtin-not-iterating,filter-builtin-not-iterating,using-cmp-argument,eq-without-hash,div-method,idiv-method,rdiv-method,exception-message-attribute,invalid-str-codec,sys-max-int,bad-python3-import,deprecated-string-function,deprecated-str-translate-call,len-as-condition,no-else-return
disable=line-too-long,invalid-sequence-index,parameter-unpacking,unpacking-in-except,backtick,long-suffix,raw-checker-failed,bad-inline-option,locally-disabled,locally-enabled,file-ignored,suppressed-message,deprecated-pragma,no-self-use,too-many-instance-attributes,too-few-public-methods,too-many-public-methods,too-many-branches,too-many-statements,fixme,global-statement,logging-format-interpolation,apply-builtin,basestring-builtin,buffer-builtin,cmp-builtin,coerce-builtin,execfile-builtin,file-builtin,long-builtin,raw_input-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,no-absolute-import,old-division,dict-iter-method,dict-view-method,next-method-called,metaclass-assignment,indexing-exception,reload-builtin,oct-method,hex-method,nonzero-method,cmp-method,input-builtin,round-builtin,intern-builtin,unichr-builtin,map-builtin-not-iterating,zip-builtin-not-iterating,range-builtin-not-iterating,filter-builtin-not-iterating,using-cmp-argument,eq-without-hash,div-method,idiv-method,rdiv-method,exception-message-attribute,sys-max-int,bad-python3-import,deprecated-string-function,deprecated-str-translate-call,len-as-condition,no-else-return,arguments-differ
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
......
......@@ -18,6 +18,7 @@
import logging
import re
import time
import threading
from zoe_lib.state import Service
from zoe_lib.config import get_conf
......@@ -27,7 +28,7 @@ from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.stats import ClusterStats # pylint: disable=unused-import
from zoe_master.stats import ClusterStats, NodeStats
log = logging.getLogger(__name__)
......@@ -40,6 +41,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
def __init__(self, opts):
super().__init__(opts)
self.docker_config = DockerConfig(get_conf().backend_docker_config_file).read_config()
self.cached_stats = None
def _get_config(self, host) -> DockerHostConfig:
for conf in self.docker_config:
......@@ -82,7 +84,76 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
return _checker.get_platform_stats()
time_start = time.time()
platform_stats = ClusterStats()
th_list = []
for host_conf in self.docker_config: # type: DockerHostConfig
node_stats = NodeStats(host_conf.name)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th.start()
th_list.append((th, node_stats))
for th, node_stats in th_list:
th.join()
platform_stats.nodes.append(node_stats)
log.debug('Time for platform stats: {:.2f}s'.format(time.time() - time_start))
return platform_stats
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats):
node_stats.labels = host_conf.labels
try:
my_engine = DockerClient(host_conf)
except ZoeException as e:
log.error(str(e))
node_stats.status = 'offline'
log.info('Node {} is offline'.format(host_conf.name))
return
else:
node_stats.status = 'online'
try:
container_list = my_engine.list()
info = my_engine.info()
except ZoeException:
return
node_stats.container_count = info['Containers']
node_stats.cores_total = info['NCPU']
node_stats.memory_total = info['MemTotal']
if info['Labels'] is not None:
node_stats.labels += set(info['Labels'])
stats = {}
for cont in container_list:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
node_stats.memory_reserved = sum([cont['memory_soft_limit'] for cont in container_list if cont['memory_soft_limit'] != node_stats.memory_total])
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_reserved = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
if get_conf().backend_image_management:
node_stats.image_list = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
node_stats.image_list.append(image)
def _get_core_usage(self, stat):
cpu_time_now = stat['cpu_stats']['cpu_usage']['total_usage']
cpu_time_pre = stat['precpu_stats']['cpu_usage']['total_usage']
return (cpu_time_now - cpu_time_pre) / 1000000000
def service_log(self, service: Service):
"""Get the log."""
......
......@@ -18,14 +18,12 @@
import logging
import threading
import time
from copy import deepcopy
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException
from zoe_master.stats import ClusterStats, NodeStats
log = logging.getLogger(__name__)
......@@ -43,7 +41,6 @@ class DockerStateSynchronizer(threading.Thread):
self.my_stop = False
self.state = state
self.setDaemon(True)
self._platform_stats = ClusterStats()
self.host_checkers = []
for docker_host in DockerConfig(get_conf().backend_docker_config_file).read_config():
th = threading.Thread(target=self._host_subthread, args=(docker_host,), name='synchro_' + docker_host.name, daemon=True)
......@@ -54,28 +51,20 @@ class DockerStateSynchronizer(threading.Thread):
def _host_subthread(self, host_config: DockerHostConfig):
log.info("Checker thread started")
node_stats = None
for node in self._platform_stats.nodes:
if node.name == host_config.name:
node_stats = node
break
if node_stats is None:
node_stats = NodeStats(host_config.name)
self._platform_stats.nodes.append(node_stats)
node_status = 'offline'
while not self.stop:
try:
my_engine = DockerClient(host_config)
except ZoeException as e:
node_status = 'offline'
log.error(str(e))
node_stats.status = 'offline'
log.info('Node {} is offline'.format(host_config.name))
time.sleep(CHECK_INTERVAL)
continue
if node_stats.status == 'offline':
log.info('Node {} is back online'.format(host_config.name))
node_stats.status = 'online'
node_stats.labels = host_config.labels
if node_status == 'offline':
log.info('Node {} is now online'.format(host_config.name))
node_status = 'online'
service_list = self.state.service_list(backend_host=host_config.name, not_status=Service.INACTIVE_STATUS)
try:
......@@ -93,74 +82,21 @@ class DockerStateSynchronizer(threading.Thread):
for service in service_list:
assert isinstance(service, Service)
if service.backend_id in containers:
self._update_service_status(service, containers[service.backend_id], host_config)
self._update_service_status(service, containers[service.backend_id])
else:
if service.status == service.CREATED_STATUS or service.backend_status == service.BACKEND_DESTROY_STATUS:
continue
else:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
try:
self._update_node_stats(my_engine, node_stats)
except ZoeException as e:
log.error(str(e))
time.sleep(CHECK_INTERVAL)
def _update_node_stats(self, my_engine, node_stats: NodeStats):
try:
container_list = my_engine.list()
info = my_engine.info()
except ZoeException:
return
node_stats.container_count = info['Containers']
node_stats.cores_total = info['NCPU']
node_stats.memory_total = info['MemTotal']
if info['Labels'] is not None:
node_stats.labels += set(info['Labels'])
stats = {}
for cont in container_list:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
node_stats.memory_reserved = sum([cont['memory_soft_limit'] for cont in container_list if cont['memory_soft_limit'] != node_stats.memory_total])
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_reserved = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
if get_conf().backend_image_management:
node_stats.image_list = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
node_stats.image_list.append(image)
def _get_core_usage(self, stat):
cpu_time_now = stat['cpu_stats']['cpu_usage']['total_usage']
cpu_time_pre = stat['precpu_stats']['cpu_usage']['total_usage']
return (cpu_time_now - cpu_time_pre) / 1000000000
def _update_service_status(self, service: Service, container, host_config: DockerHostConfig):
def _update_service_status(self, service: Service, container):
"""Update the service status."""
if service.backend_status != container['state']:
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
for port in service.ports:
if port.internal_name in container['ports'] and container['ports'][port.internal_name] is not None:
port.activate(host_config.external_address, container['ports'][port.internal_name])
else:
port.reset()
def run(self):
"""The thread loop."""
......@@ -187,7 +123,3 @@ class DockerStateSynchronizer(threading.Thread):
for th, conf_ in self.host_checkers:
th.join()
self.my_stop = True
def get_platform_stats(self):
"""Returns a copy of the platform stats."""
return deepcopy(self._platform_stats)
......@@ -174,14 +174,22 @@ def terminate_execution(execution: Execution) -> None:
execution.set_terminated()
def get_platform_state(state: SQLManager, with_images=False) -> ClusterStats:
"""Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling."""
CACHED_STATS = None # type: ClusterStats
def get_platform_state(state: SQLManager, force_update=False) -> ClusterStats:
"""Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling.
Since retrieving statistics can be slow, the we cache results and use the force_update parameter to know when fresh (and slow) values are absolutely needed."""
global CACHED_STATS
if CACHED_STATS is not None and not force_update:
return CACHED_STATS
backend = _get_backend()
platform_state = backend.platform_state()
for node in platform_state.nodes: # type: NodeStats
node.services = state.service_list(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
if not with_images:
node.image_list = []
CACHED_STATS = platform_state
return platform_state
......
......@@ -47,11 +47,6 @@ class SwarmStateSynchronizer(threading.Thread):
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
for port in service.ports:
if port.internal_name in container['ports'] and container['ports'][port.internal_name] is not None:
port.activate(container['ports'][port.internal_name][0], container['ports'][port.internal_name][1])
else:
port.reset()
def run(self):
"""The thread loop."""
......
......@@ -21,16 +21,15 @@ import logging
import os
import zoe_lib.config as config
from zoe_lib.metrics.influxdb import InfluxDBMetricSender
from zoe_lib.metrics.logging import LogMetricSender
from zoe_lib.state import SQLManager
import zoe_master.scheduler
import zoe_master.backends.interface
from zoe_master.preprocessing import restart_resubmit_scheduler
from zoe_master.master_api import APIManager
import zoe_master.scheduler
from zoe_master.exceptions import ZoeException
from zoe_master.gelf_listener import GELFListener
from zoe_master.master_api import APIManager
from zoe_master.metrics.influxdb import InfluxDBMetricSender
from zoe_master.metrics.logging import LogMetricSender
from zoe_master.preprocessing import restart_resubmit_scheduler
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
......@@ -62,14 +61,14 @@ def main():
if ret != 0:
return ret
if config.get_conf().influxdb_enable:
metrics = InfluxDBMetricSender(config.get_conf().deployment_name, config.get_conf().influxdb_url, config.get_conf().influxdb_dbname)
else:
metrics = LogMetricSender(config.get_conf().deployment_name)
log.info("Initializing DB manager")
state = SQLManager(args)
if config.get_conf().influxdb_enable:
metrics = InfluxDBMetricSender(state)
else:
metrics = LogMetricSender(state)
try:
zoe_master.backends.interface.initialize_backend(state)
except ZoeException as e:
......
......@@ -21,11 +21,10 @@ import time
import zmq
import zoe_lib.config as config
from zoe_lib.metrics.base import BaseMetricSender
from zoe_lib.state import SQLManager
import zoe_master.preprocessing
from zoe_master.exceptions import ZoeException
from zoe_master.metrics.base import BaseMetricSender
from zoe_master.scheduler import ZoeBaseScheduler
log = logging.getLogger(__name__)
......
......@@ -20,8 +20,13 @@ import logging
import threading
import queue
from zoe_lib.config import get_conf
from zoe_master.backends.interface import get_platform_state
log = logging.getLogger(__name__)
METRIC_INTERVAL = 20
def time_diff_ms(start: float, end: float) -> int:
"""Return a time difference in milliseconds."""
......@@ -33,12 +38,13 @@ class BaseMetricSender:
BUFFER_MAX_SIZE = 6
def __init__(self, deployment_name):
def __init__(self, state):
self.state = state
self._queue = queue.Queue()
self._buffer = []
self._th = None
self._th = threading.Thread(name='metrics', target=self._metrics_loop, daemon=True)
self.deployment_name = deployment_name
self.deployment_name = get_conf().deployment_name
def _start(self):
self._th.start()
......@@ -64,19 +70,18 @@ class BaseMetricSender:
self._th.join()
def _metrics_loop(self):
while True:
try:
data = self._queue.get(timeout=1)
except queue.Empty:
self._send_buffer()
continue
if data == 'quit':
if len(self._buffer) > 0:
self._send_buffer()
break
if data != 'quit':
self._buffer.append(data)
if len(self._buffer) > self.BUFFER_MAX_SIZE:
self._send_buffer()
stop = False
while not stop:
time_start = time.time()
get_platform_state(self.state, force_update=True)
while not self._queue.empty():
data = self._queue.get(block=False)
if data == 'quit':
stop = True
else:
self._buffer.append(data)
self._send_buffer()
sleep_time = METRIC_INTERVAL - (time.time() - time_start)
if sleep_time > 0 and not stop:
time.sleep(sleep_time)
......@@ -15,23 +15,26 @@
"""InfluxDB implementation of the metrics system."""
import time
import logging
import time
import requests
import zoe_lib.metrics.base
import zoe_master.metrics.base
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class InfluxDBMetricSender(zoe_lib.metrics.base.BaseMetricSender):
class InfluxDBMetricSender(zoe_master.metrics.base.BaseMetricSender):
"""Sends metrics to InfluxDB."""
RETRIES = 5
def __init__(self, deployment_name, influxdb_url, influxdb_dbname):
super().__init__(deployment_name)
def __init__(self, state):
super().__init__(state)
influxdb_url = get_conf().influxdb_url
influxdb_dbname = get_conf().influxdb_dbname
self._influxdb_endpoint = influxdb_url + '/write?precision=ms&db=' + influxdb_dbname
self._retries = self.RETRIES
self._start()
......@@ -61,7 +64,7 @@ class InfluxDBMetricSender(zoe_lib.metrics.base.BaseMetricSender):
def metric_api_call(self, time_start, action):
"""Compute and emit the run time of an API call."""
time_end = time.time()
diff = zoe_lib.metrics.base.time_diff_ms(time_start, time_end)
diff = zoe_master.metrics.base.time_diff_ms(time_start, time_end)
point_str = "api_latency"
point_str += ",api_call=" + action
......
......@@ -17,15 +17,15 @@
import logging
import zoe_lib.metrics.base
import zoe_master.metrics.base
log = logging.getLogger(__name__)
class LogMetricSender(zoe_lib.metrics.base.BaseMetricSender):
class LogMetricSender(zoe_master.metrics.base.BaseMetricSender):
"""Send metrics to the configured logging sink."""
def __init__(self, deployment_name):
super().__init__(deployment_name)
def __init__(self, state):
super().__init__(state)
self._start()
def _send_buffer(self):
......
......@@ -115,7 +115,7 @@ class ZoeElasticScheduler:
th = self.async_threads.pop(0)
th.join(0.1)
if th.isAlive(): # join failed
log.debug('Thread {} join failed'.format(th.name))
# log.debug('Thread {} join failed'.format(th.name))
self.async_threads.append(th)
counter -= 1
......@@ -190,7 +190,7 @@ class ZoeElasticScheduler:
log.debug("-> {}".format(job))
try:
platform_state = get_platform_state(self.state, with_images=True)
platform_state = get_platform_state(self.state, force_update=True)
except ZoeException:
log.error('Cannot retrieve platform state, cannot schedule')
for job in jobs_to_attempt_scheduling:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment