Commit 3d21e03d authored by Daniele Venzano's avatar Daniele Venzano

Refactor the metrics subsytem

parent 3ae7ff8d
......@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for metrics."""
import time
import logging
import threading
......@@ -21,23 +23,60 @@ import queue
log = logging.getLogger(__name__)
class BaseMetricSender(threading.Thread):
def __init__(self, name, conf):
super().__init__(name=name)
def time_diff_ms(start: float, end: float) -> int:
"""Return a time difference in milliseconds."""
return (end - start) * 1000
self._deployment = conf.deployment_name
self._queue = queue.Queue()
def quit(self):
pass
class BaseMetricSender:
"""Base class for collecting and sending out metrics."""
BUFFER_MAX_SIZE = 6
def __init__(self, deployment_name):
self._queue = queue.Queue()
self._buffer = []
self._th = None
self._th = threading.Thread(name='metrics', target=self._metrics_loop, daemon=True)
self.deployment_name = deployment_name
def _time_diff_ms(self, start: float, end: float) -> int:
return (end - start) * 1000
def _start(self):
self._th.start()
def metric_api_call(self, time_start, action):
"""Compute and pass the metric point of an API call to the sender thread."""
time_end = time.time()
td = self._time_diff_ms(time_start, time_end)
log.debug("api latency: {} took {} ms".format(action, td))
diff = time_diff_ms(time_start, time_end)
point = "api latency: {} took {} ms".format(action, diff)
self._queue.put(point)
def _send_buffer(self):
"""
Sends the buffered data.
Needs to be redefined in child classes to support other metrics databases. Is called in thread context.
"""
raise NotImplementedError
def quit(self):
"""Terminates the sender thread."""
self._queue.put("quit")
self._th.join()
def _metrics_loop(self):
while True:
try:
data = self._queue.get(timeout=1)
except queue.Empty:
self._send_buffer()
continue
if data == 'quit':
if len(self._buffer) > 0:
self._send_buffer()
break
def run(self):
pass
if data != 'quit':
self._buffer.append(data)
if len(self._buffer) > self.BUFFER_MAX_SIZE:
self._send_buffer()
......@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""InfluxDB implementation of the metrics system."""
import time
import requests
import logging
import queue
import requests
import zoe_lib.metrics.base
......@@ -24,71 +26,47 @@ log = logging.getLogger(__name__)
class InfluxDBMetricSender(zoe_lib.metrics.base.BaseMetricSender):
def __init__(self, conf):
super().__init__('influxdb_sender', conf)
"""Sends metrics to InfluxDB."""
RETRIES = 5
self._buffer = []
self._influxdb_endpoint = conf.influxdb_url + '/write?precision=ms&db=' + conf.influxdb_dbname
self.retries = 5
def __init__(self, deployment_name, influxdb_url, influxdb_dbname):
super().__init__(deployment_name)
self._influxdb_endpoint = influxdb_url + '/write?precision=ms&db=' + influxdb_dbname
self._retries = self.RETRIES
self._start()
def _send_buffer(self):
error = False
if self._influxdb_endpoint is not None and len(self._buffer) > 0:
payload = '\n'.join(self._buffer)
try:
r = requests.post(self._influxdb_endpoint, data=payload)
except:
log.exception('error writing metrics to influxdb, will retry {} times'.format(self.retries))
req = requests.post(self._influxdb_endpoint, data=payload)
except Exception:
log.exception('error writing metrics to influxdb, will retry {} times'.format(self._retries))
error = True
else:
if r.status_code != 204:
log.error('error writing metrics to influxdb, will retry {} times'.format(self.retries))
if req.status_code != 204:
log.error('error writing metrics to influxdb, will retry {} times'.format(self._retries))
error = True
if error:
if self.retries <= 0:
self.retries = 5
if self._retries <= 0:
self._retries = self.RETRIES
self._buffer.clear()
else:
self.retries -= 1
self._retries -= 1
else:
self._buffer.clear()
def quit(self):
self._queue.put('quit')
def point(self, measurement_name: str, value: int, **kwargs):
ts = time.time()
point_str = measurement_name
for k, v in kwargs.items():
point_str += "," + k + '=' + v
point_str += ',' + 'deployment' + '=' + self._deployment
point_str += " value=" + str(value)
point_str += " " + str(int(ts * 1000))
self._queue.put(point_str)
def metric_api_call(self, time_start, action):
"""Compute and emit the run time of an API call."""
time_end = time.time()
td = self._time_diff_ms(time_start, time_end)
self.point("api_latency", td, api_call=action)
diff = zoe_lib.metrics.base.time_diff_ms(time_start, time_end)
def run(self):
log.info('starting influxdb metric sender thread')
while True:
try:
data = self._queue.get(timeout=1)
except queue.Empty:
if len(self._buffer) > 0:
self._send_buffer()
continue
if data == 'quit':
log.info('influxdb thread got a quit command')
if len(self._buffer) > 0:
self._send_buffer()
break
point_str = "api_latency"
point_str += ",api_call=" + action
point_str += ',' + 'deployment' + '=' + self.deployment_name
point_str += " value=" + str(diff)
point_str += " " + str(int(time_end * 1000))
if data != 'quit':
self._buffer.append(data)
if len(self._buffer) > 6:
self._send_buffer()
self._queue.put(point_str)
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Default metrics output to the logging system."""
import logging
import zoe_lib.metrics.base
log = logging.getLogger(__name__)
class LogMetricSender(zoe_lib.metrics.base.BaseMetricSender):
"""Send metrics to the configured logging sink."""
def __init__(self, deployment_name):
super().__init__(deployment_name)
self._start()
def _send_buffer(self):
"""
Sends the buffered data.
Needs to be redefined in child classes to support other metrics databases. Is called in thread context.
"""
if len(self._buffer) == 0:
return
for point in self._buffer:
log.debug(point)
self._buffer.clear()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment