Commit 5401eeeb authored by Daniele Venzano's avatar Daniele Venzano

Add option to retrieve statistics from KairosDB instead of Docker

parent b5429cc8
......@@ -62,6 +62,10 @@ def load_configuration(test_conf=None):
argparser.add_argument('--influxdb-dbname', help='Name of the InfluxDB database to use for storing metrics', default='zoe')
argparser.add_argument('--influxdb-url', help='URL of the InfluxDB service (ex. http://localhost:8086)', default='http://localhost:8086')
argparser.add_argument('--influxdb-enable', action="store_true", help='Enable metric output toward influxDB')
argparser.add_argument('--kairosdb-enable', action="store_true", help='Enable metric input from KairosDB')
argparser.add_argument('--kairosdb-url', help='URL of the KairosDB service (ex. http://localhost:8086)', default='http://localhost:8090')
argparser.add_argument('--workspace-base-path', help='Base directory where user workspaces will be created. Must be visible at this path on all hosts.', default='/mnt/zoe-workspaces')
argparser.add_argument('--workspace-deployment-path', help='Path appended to the workspace path to distinguish this deployment. If unspecified is equal to the deployment name.', default='--default--')
argparser.add_argument('--overlay-network-name', help='Name of the Swarm overlay network Zoe should use', default='zoe')
......
......@@ -50,7 +50,7 @@ class BaseBackend:
"""Terminate the container corresponding to a service."""
raise NotImplementedError
def platform_state(self) -> ClusterStats:
def platform_state(self, state=None) -> ClusterStats:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
......
......@@ -29,6 +29,7 @@ from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.metrics.incoming.kairosdb import KairosDBInMetrics
log = logging.getLogger(__name__)
......@@ -85,7 +86,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def platform_state(self) -> ClusterStats:
def platform_state(self, state=None) -> ClusterStats:
"""Get the platform state."""
time_start = time.time()
platform_stats = ClusterStats()
......@@ -93,7 +94,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
for host_conf in self.docker_config: # type: DockerHostConfig
node_stats = NodeStats(host_conf.name)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats, state), name='stats_host_{}'.format(host_conf.name), daemon=True)
th.start()
th_list.append((th, node_stats))
......@@ -104,7 +105,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.debug('Time for platform stats: {:.2f}s'.format(time.time() - time_start))
return platform_stats
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats):
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats, state):
node_stats.labels = host_conf.labels
try:
my_engine = DockerClient(host_conf)
......@@ -128,19 +129,26 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
if info['Labels'] is not None:
node_stats.labels += set(info['Labels'])
stats = {}
for cont in container_list:
try:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
except ZoeException:
continue
node_stats.memory_reserved = sum([cont['memory_soft_limit'] for cont in container_list if cont['memory_soft_limit'] != node_stats.memory_total])
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_reserved = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
stats = {}
if get_conf().kairosdb_enable:
kdb = KairosDBInMetrics()
for cont in container_list:
stats[cont['id']] = kdb.get_service_usage(cont['name'])
node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
else:
for cont in container_list:
try:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
except ZoeException:
continue
node_stats.memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
if get_conf().backend_image_management:
node_stats.image_list = []
......
......@@ -191,7 +191,7 @@ def get_platform_state(state: SQLManager, force_update=False) -> ClusterStats:
return CACHED_STATS
backend = _get_backend()
platform_state = backend.platform_state()
platform_state = backend.platform_state(state)
for node in platform_state.nodes: # type: NodeStats
node.services = state.service_list(backend_host=node.name, backend_status=Service.BACKEND_START_STATUS)
CACHED_STATS = platform_state
......
......@@ -67,7 +67,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
"""Terminate and delete a container."""
self.kube.terminate(service.dns_name)
def platform_state(self) -> ClusterStats:
def platform_state(self, state=None) -> ClusterStats:
"""Get the platform state."""
info = self.kube.info()
for node in info.nodes: # type: NodeStats
......
......@@ -70,7 +70,7 @@ class SwarmBackend(zoe_master.backends.base.BaseBackend):
"""Terminate and delete a container."""
self.swarm.terminate_container(service.backend_id, delete=True)
def platform_state(self) -> ClusterStats:
def platform_state(self, state=None) -> ClusterStats:
"""Get the platform state."""
info = self.swarm.info()
for node in info.nodes: # type: NodeStats
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Retrieves metrics about services from KairosDB."""
from datetime import datetime, timedelta
import logging
import requests
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class KairosDBInMetrics:
"""KairosDB metrics."""
def __init__(self):
self.base_url = get_conf().kairosdb_url
self.tags_url = self.base_url + '/api/v1/datapoints/query/tags'
self.metrics_url = self.base_url + '/api/v1/datapoints/query'
self.list_metrics_url = self.base_url + '/api/v1/metricnames'
def _prepare_query(self):
query = {
'time_zone': 'UTC',
'metrics': []
}
self._add_time_range(query)
return query
def _add_time_range(self, query, minutes_from_now=10):
end = datetime.utcnow()
start = end - timedelta(minutes=minutes_from_now)
query['start_absolute'] = int(start.timestamp() * 1000)
query['end_absolute'] = int(end.timestamp() * 1000)
def _add_metric(self, query, metric_name: str, tags, aggregators, limit: int):
metric = {
'name': metric_name,
}
if tags is not None:
metric['tags'] = tags
if aggregators is not None:
metric['aggregators'] = aggregators
if limit > 0:
metric['limit'] = limit
query['metrics'].append(metric)
def get_service_usage(self, container_name):
"""Query the DB for the current usage metrics."""
query = self._prepare_query()
tags_cpu = {
"field": ["usage_percent"],
"container_name": container_name
}
aggregators_cpu = [
{"name": "scale", "factor": "0.01"},
{"name": "sum", "sampling": {"value": "1", "unit": "minutes"}, "align_sampling": False}
]
self._add_metric(query, "docker_container_cpu", tags_cpu, aggregators_cpu, limit=0)
tags_memory = {
"field": ["usage"],
"container_name": container_name
}
aggregators_memory = [
{"name": "sum", "sampling": {"value": "1", "unit": "minutes"}, "align_sampling": False}
]
self._add_metric(query, "docker_container_mem", tags_memory, aggregators_memory, limit=0)
r = requests.post(self.metrics_url, json=query)
return self._extract_data(r)
def _extract_data(self, response):
if response.status_code != 200:
error_msg = ''
for error in response.json()['errors']:
error_msg += ' {}'.format(error)
log.error('kairosdb query error: {}'.format(error_msg))
return None
else:
data = response.json()
cpu_results = data['queries'][0]
mem_results = data['queries'][1]
if cpu_results['sample_size'] > 0:
assert len(cpu_results['results']) == 1
cpu_usage = cpu_results['results'][0]['values'][-1][1]
else:
cpu_usage = 0
if mem_results['sample_size'] > 0:
assert len(mem_results['results']) == 1
mem_usage = mem_results['results'][0]['values'][-1][1]
else:
mem_usage = 0
return {
'cpu_usage': cpu_usage,
'mem_usage': mem_usage
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment