Commit 524ef4d9 authored by Daniele Venzano's avatar Daniele Venzano

Use the metrics thread to gather statistics, so the API does not time out

parent 34da7121
......@@ -17,7 +17,6 @@
import logging
import os
from time import time
import zoe_api.exceptions
import zoe_api.master_api
......@@ -41,8 +40,6 @@ class APIEndpoint:
def __init__(self, master_api, sql_manager):
self.master = master_api
self.sql = sql_manager
self.cached_statistics = None
self.cached_statistics_time = time()
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
"""Lookup an execution by its ID."""
......@@ -160,12 +157,9 @@ class APIEndpoint:
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
if self.cached_statistics is None or time() - self.cached_statistics_time > 60:
success, message = self.master.scheduler_statistics()
if success:
self.cached_statistics_time = time()
self.cached_statistics = message
return self.cached_statistics
success, message = self.master.scheduler_statistics()
if success:
return message
def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services."""
......
......@@ -27,8 +27,7 @@ import zoe_master.scheduler
from zoe_master.exceptions import ZoeException
from zoe_master.gelf_listener import GELFListener
from zoe_master.master_api import APIManager
from zoe_master.metrics.influxdb import InfluxDBMetricSender
from zoe_master.metrics.logging import LogMetricSender
from zoe_master.metrics.base import StatsManager
from zoe_master.preprocessing import restart_resubmit_scheduler
log = logging.getLogger("main")
......@@ -64,10 +63,8 @@ def main():
log.info("Initializing DB manager")
state = SQLManager(args)
if config.get_conf().influxdb_enable:
metrics = InfluxDBMetricSender(state)
else:
metrics = LogMetricSender(state)
metrics = StatsManager(state)
metrics.start()
try:
zoe_master.backends.interface.initialize_backend(state)
......@@ -93,7 +90,7 @@ def main():
except KeyboardInterrupt:
pass
except Exception:
log.exception('fatal error')
log.exception('Fatal error in API loop')
finally:
scheduler.quit()
api_server.quit()
......
......@@ -24,7 +24,7 @@ import zoe_lib.config as config
from zoe_lib.state import SQLManager
import zoe_master.preprocessing
from zoe_master.exceptions import ZoeException
from zoe_master.metrics.base import BaseMetricSender
from zoe_master.metrics.base import StatsManager
from zoe_master.scheduler import ZoeBaseScheduler
log = logging.getLogger(__name__)
......@@ -32,7 +32,7 @@ log = logging.getLogger(__name__)
class APIManager:
"""The API Manager."""
def __init__(self, metrics: BaseMetricSender, scheduler: ZoeBaseScheduler, state: SQLManager) -> None:
def __init__(self, metrics: StatsManager, scheduler: ZoeBaseScheduler, state: SQLManager) -> None:
self.context = zmq.Context()
self.zmq_s = self.context.socket(zmq.REP)
self.listen_uri = config.get_conf().api_listen_uri
......@@ -87,6 +87,7 @@ class APIManager:
elif message['command'] == 'scheduler_stats':
try:
data = self.scheduler.stats()
data['platform_stats'] = self.metrics.current_platform_stats.serialize()
except ZoeException as e:
log.error(str(e))
self._reply_error(str(e))
......@@ -100,7 +101,7 @@ class APIManager:
self._reply_error('bug')
raise ZoeException('BUG: command {} does not fill a reply')
self.metrics.metric_api_call(start_time, message['command'])
log.debug('API call {} took {:.2f}s'.format(message['command'], time.time() - start_time))
def quit(self) -> None:
"""Cleanly close the ZMQ resources."""
......
......@@ -18,9 +18,9 @@
import time
import logging
import threading
import queue
from zoe_lib.config import get_conf
from zoe_master.backends.interface import get_platform_state
log = logging.getLogger(__name__)
......@@ -32,54 +32,28 @@ def time_diff_ms(start: float, end: float) -> int:
return int((end - start) * 1000)
class BaseMetricSender:
"""Base class for collecting and sending out metrics."""
BUFFER_MAX_SIZE = 6
class StatsManager(threading.Thread):
"""Class for collecting and sending out metrics and statistics."""
def __init__(self, state):
super().__init__(name='metrics', daemon=True)
self.state = state
self._queue = queue.Queue()
self._buffer = []
self._th = None
self._th = threading.Thread(name='metrics', target=self._metrics_loop, daemon=True)
self.deployment_name = get_conf().deployment_name
def _start(self):
self._th.start()
def metric_api_call(self, time_start, action):
"""Compute and pass the metric point of an API call to the sender thread."""
time_end = time.time()
diff = time_diff_ms(time_start, time_end)
point = "api latency: {} took {} ms".format(action, diff)
self._queue.put(point)
def _send_buffer(self):
"""
Sends the buffered data.
Needs to be redefined in child classes to support other metrics databases. Is called in thread context.
"""
raise NotImplementedError
self.stop = False
self.current_platform_stats = None
def quit(self):
"""Terminates the sender thread."""
self._queue.put("quit")
self._th.join()
self.stop = True
self.join()
def _metrics_loop(self):
stop = False
while not stop:
def run(self):
"""The thread loop."""
while not self.stop:
time_start = time.time()
while not self._queue.empty():
data = self._queue.get(block=False)
if data == 'quit':
stop = True
else:
self._buffer.append(data)
self._send_buffer()
self.current_platform_stats = get_platform_state(self.state, with_usage_stats=True)
sleep_time = METRIC_INTERVAL - (time.time() - time_start)
if sleep_time > 0 and not stop:
if sleep_time > 0 and not self.stop:
time.sleep(sleep_time)
......@@ -290,13 +290,14 @@ class ZoeElasticScheduler:
'running_length': len(self.queue_running),
'termination_threads_count': len(self.async_threads),
'queue': [s.id for s in queue],
'running_queue': [s.id for s in self.queue_running],
'platform_stats': get_platform_state(self.state, with_usage_stats=True).serialize()
'running_queue': [s.id for s in self.queue_running]
}
def _adjust_core_limits(self):
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
if len(node.services) == 0:
continue
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment