Commit bd1ac8cd authored by Daniele Venzano's avatar Daniele Venzano

Add ability to send metrics to influxdb

It uses a separate thread to maintain a buffer of metrics that is flushed periodically. For now only API latency metrics are sent.
parent 00291917
......@@ -16,39 +16,72 @@
import time
import requests
import logging
import threading
import queue
log = logging.getLogger(__name__)
_buffer = []
_influxdb_endpoint = None
class InfluxDBMetricSender(threading.Thread):
def __init__(self, conf):
super().__init__(name='influxdb_sender')
def init(influxdb_dbname, influxdb_url):
global _influxdb_endpoint
_influxdb_endpoint = influxdb_url + '/write?precision=ms&db=' + influxdb_dbname
self._buffer = []
self._deployment = conf.container_name_prefix
self._influxdb_endpoint = conf.influxdb_url + '/write?precision=ms&db=' + conf.influxdb_dbname
self._queue = queue.Queue()
def _send_buffer(self):
if self._influxdb_endpoint is not None and len(self._buffer) > 0:
payload = '\n'.join(self._buffer)
try:
r = requests.post(self._influxdb_endpoint, data=payload)
except:
log.exception('error writing metrics to influxdb, data thrown away')
else:
if r.status_code != 204:
log.error('error writing metrics to influxdb, data thrown away')
self._buffer.clear()
def _send_buffer():
if _influxdb_endpoint is not None:
payload = '\n'.join(_buffer)
r = requests.post(_influxdb_endpoint, data=payload)
if r.status_code != 204:
log.error('error writing metrics to influxdb')
_buffer.clear()
def quit(self):
self._queue.put('quit')
def point(self, measurement_name: str, value: int, **kwargs):
ts = time.time()
point_str = measurement_name
for k, v in kwargs.items():
point_str += "," + k + '=' + v
point_str += ',' + 'deployment' + '=' + self._deployment
point_str += " value=" + str(value)
point_str += " " + str(int(ts * 1000))
def point(measurement_name: str, value: int, **kwargs):
ts = time.time()
point_str = measurement_name
for k, v in kwargs.items():
point_str += "," + k + '=' + v
point_str += " value=" + str(value)
point_str += " " + str(int(ts * 1000))
self._queue.put(point_str)
_buffer.append(point_str)
if len(_buffer) > 5:
_send_buffer()
def _time_diff_ms(self, start: float, end: float) -> int:
return (end - start) * 1000
def metric_api_call(self, time_start, api_name, action, calling_user):
time_end = time.time()
td = self._time_diff_ms(time_start, time_end)
self.point("api_latency", td, api_name=api_name, action=action, calling_user_name=calling_user.name)
def time_diff_ms(start: float, end: float) -> int:
return int((end - start) * 1000)
def run(self):
log.info('starting influxdb metric sender thread')
while True:
try:
data = self._queue.get(timeout=1)
except queue.Empty:
if len(self._buffer) > 0:
self._send_buffer()
continue
if data == 'quit':
log.info('influxdb thread got a quit command')
if len(self._buffer) > 0:
self._send_buffer()
break
if data != 'quit':
self._buffer.append(data)
if len(self._buffer) > 6:
self._send_buffer()
......@@ -20,6 +20,10 @@ config_paths = [
'/etc/zoe/zoe-scheduler.conf'
]
singletons = {
'metric': None
}
_conf = None
......@@ -41,6 +45,7 @@ def load_configuration(test_conf=None):
argparser.add_argument('--container-name-prefix', help='String prefixed to all container names generated by this instance of Zoe', default='prod')
argparser.add_argument('--influxdb-dbname', help='Name of the InfluxDB database to use for storing metrics', default='zoe')
argparser.add_argument('--influxdb-url', help='URL of the InfluxDB service (ex. http://localhost:8086)', default='http://localhost:8086')
argparser.add_argument('--influxdb-enable', action="store_true", help='Enable metric output toward influxDB')
argparser.add_argument('--passlib-rounds', type=int, help='Number of hashing rounds for passwords', default=60000)
opts = argparser.parse_args()
......
......@@ -23,10 +23,11 @@ from tornado.ioloop import IOLoop
from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.scheduler_policies import FIFOSchedulerPolicy
from zoe_scheduler.config import load_configuration, get_conf
import zoe_scheduler.config as config
from zoe_scheduler.rest_api import init as api_init
from zoe_scheduler.state.manager import StateManager
from zoe_scheduler.state.blobs.fs import FSBlobs
from zoe_lib.metrics.influxdb import InfluxDBMetricSender
log = logging.getLogger("main")
......@@ -36,8 +37,8 @@ def main():
The entrypoint for the zoe-scheduler script.
:return: int
"""
load_configuration()
args = get_conf()
config.load_configuration()
args = config.get_conf()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
......@@ -68,6 +69,11 @@ def main():
log.info("Initializing API")
app = api_init(state_manager, pm)
if config.get_conf().influxdb_enable:
metrics_th = InfluxDBMetricSender(config.get_conf())
metrics_th.start()
config.singletons['metric'] = metrics_th
log.info("Starting HTTP REST server...")
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
......@@ -77,4 +83,7 @@ def main():
try:
ioloop.start()
except KeyboardInterrupt:
if config.singletons['metric'] is not None:
config.singletons['metric'].quit()
config.singletons['metric'].join()
print("CTRL-C detected, terminating")
......@@ -19,7 +19,7 @@ from werkzeug.exceptions import BadRequest
from flask_restful import Resource, request
from zoe_lib.exceptions import ZoeException, ZoeRestAPIException
from zoe_lib.metrics.influxdb import point, time_diff_ms
from zoe_scheduler.config import singletons
from zoe_scheduler.state.manager import StateManager
from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.rest_api.utils import catch_exceptions
......@@ -49,8 +49,7 @@ class ApplicationAPI(Resource):
is_authorized(calling_user, app, 'get')
ret = app.to_dict(checkpoint=False)
end = time.time()
point('service_time', time_diff_ms(start, end), action='get', object='application', user=calling_user.name)
singletons['metric'].metric_api_call(start, 'application', 'get', calling_user)
return ret
@catch_exceptions
......@@ -75,8 +74,7 @@ class ApplicationAPI(Resource):
self.state.state_updated()
end = time.time()
point('service_time', time_diff_ms(start, end), action='delete', object='application', user=calling_user.name)
singletons['metric'].metric_api_call(start, 'application', 'delete', calling_user)
return '', 204
......@@ -112,6 +110,5 @@ class ApplicationCollectionAPI(Resource):
self.state.new('application', app)
self.state.state_updated()
end = time.time()
point('service_time', time_diff_ms(start, end), action='post', object='application', user=calling_user.name)
singletons['metric'].metric_api_call(start, 'application', 'post', calling_user)
return {'application_id': app.id}, 201
......@@ -14,6 +14,7 @@
# limitations under the License.
import logging
import time
from flask_restful import Resource, request
from zoe_lib.exceptions import ZoeRestAPIException
......@@ -22,6 +23,7 @@ from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.rest_api.utils import catch_exceptions
from zoe_scheduler.rest_api.auth.authentication import authenticate
from zoe_scheduler.rest_api.auth.authorization import is_authorized
from zoe_scheduler.config import singletons
log = logging.getLogger(__name__)
......@@ -37,6 +39,7 @@ class ContainerAPI(Resource):
@catch_exceptions
def get(self, container_id):
start_time = time.time()
calling_user = authenticate(request, self.state)
c = self.state.get_one('container', id=container_id)
......@@ -44,10 +47,12 @@ class ContainerAPI(Resource):
raise ZoeRestAPIException('No such container', 404)
is_authorized(calling_user, c, 'get')
singletons['metric'].metric_api_call(start_time, 'container', 'get', calling_user)
return c.to_dict(checkpoint=False)
@catch_exceptions
def delete(self, container_id):
start_time = time.time()
calling_user = authenticate(request, self.state)
c = self.state.get_one('container', id=container_id)
......@@ -60,10 +65,11 @@ class ContainerAPI(Resource):
log.info("Monitor container died ({}), terminating execution {}".format(c.name, c.execution.name))
self.platform.execution_terminate(c.execution, reason='finished')
self.state.state_updated()
return '', 204
else:
# A non-fundamental container died, nothing we can do?
# We leave everything in place, so when the execution terminates we will
# gather the logs also of the containers that died
log.warning("Container {} died by itself".format(c.name))
return '', 204
singletons['metric'].metric_api_call(start_time, 'container', 'get', calling_user)
return '', 204
......@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from werkzeug.exceptions import BadRequest
from flask_restful import Resource, request
......@@ -23,6 +24,7 @@ from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.state.execution import Execution
from zoe_scheduler.rest_api.auth.authentication import authenticate
from zoe_scheduler.rest_api.auth.authorization import is_authorized, check_quota
from zoe_scheduler.config import singletons
class ExecutionAPI(Resource):
......@@ -36,6 +38,7 @@ class ExecutionAPI(Resource):
@catch_exceptions
def get(self, execution_id):
start_time = time.time()
calling_user = authenticate(request, self.state)
e = self.state.get_one('execution', id=execution_id)
......@@ -45,6 +48,7 @@ class ExecutionAPI(Resource):
is_authorized(calling_user, e, 'get')
ret = e.to_dict(checkpoint=False)
singletons['metric'].metric_api_call(start_time, 'execution', 'get', calling_user)
return ret
@catch_exceptions
......@@ -55,6 +59,7 @@ class ExecutionAPI(Resource):
:param execution_id: the execution to be deleted
:return:
"""
start_time = time.time()
calling_user = authenticate(request, self.state)
e = self.state.get_one('execution', id=execution_id)
......@@ -68,6 +73,7 @@ class ExecutionAPI(Resource):
self.state.state_updated()
singletons['metric'].metric_api_call(start_time, 'execution', 'delete', calling_user)
return '', 204
......@@ -87,6 +93,7 @@ class ExecutionCollectionAPI(Resource):
:return:
"""
start_time = time.time()
calling_user = authenticate(request, self.state)
execs = self.state.get('execution')
ret = []
......@@ -97,6 +104,7 @@ class ExecutionCollectionAPI(Resource):
continue
else:
ret.append(e.to_dict(checkpoint=False))
singletons['metric'].metric_api_call(start_time, 'execution', 'list', calling_user)
return ret
@catch_exceptions
......@@ -105,6 +113,7 @@ class ExecutionCollectionAPI(Resource):
Starts an execution, given an application_id. Takes a JSON object like this: { "application_id": 4 }
:return: the new execution_id
"""
start_time = time.time()
calling_user = authenticate(request, self.state)
try:
......@@ -131,4 +140,5 @@ class ExecutionCollectionAPI(Resource):
self.state.state_updated()
singletons['metric'].metric_api_call(start_time, 'execution', 'start', calling_user)
return {'execution_id': execution.id}, 201
......@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from werkzeug.exceptions import BadRequest
from flask_restful import Resource, request
......@@ -21,6 +23,7 @@ from zoe_scheduler.state.manager import StateManager
from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.rest_api.utils import catch_exceptions
from zoe_scheduler.rest_api.auth.authentication import authenticate
from zoe_scheduler.config import singletons
class QueryAPI(Resource):
......@@ -34,6 +37,7 @@ class QueryAPI(Resource):
@catch_exceptions
def post(self):
start_time = time.time()
calling_user = authenticate(request, self.state)
try:
......@@ -58,23 +62,25 @@ class QueryAPI(Resource):
if what == 'stats swarm':
ret = self.platform.swarm_stats()
return {'stats': ret.to_dict()}
ret = {'stats': ret.to_dict()}
elif what == 'stats scheduler':
ret = self.platform.scheduler_stats()
return {'stats': ret.to_dict()}
ret = {'stats': ret.to_dict()}
elif what == 'execution logs':
pass # TODO
ret = None # TODO
elif what == 'container logs':
c_list = self.state.get('container', **filters)
logs = self._get_container_logs(c_list)
return logs
ret = logs
elif what == 'container stats':
c_list = self.state.get('container', **filters)
stats = self._get_container_stats(c_list)
return [s.to_dict() for s in stats]
ret = [s.to_dict() for s in stats]
else:
ret = self.state.get(what, **filters)
return [o.to_dict(checkpoint=False) for o in ret]
ret = [o.to_dict(checkpoint=False) for o in ret]
singletons['metric'].metric_api_call(start_time, 'query', what, calling_user)
return ret
def _get_container_logs(self, c_list):
logs = []
......
......@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from werkzeug.exceptions import BadRequest
from flask_restful import Resource, request
......@@ -23,6 +25,7 @@ from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.rest_api.auth.authentication import authenticate
from zoe_scheduler.rest_api.auth.authorization import is_authorized
from zoe_scheduler.state.user import User
from zoe_scheduler.config import singletons
class UserAPI(Resource):
......@@ -36,6 +39,7 @@ class UserAPI(Resource):
@catch_exceptions
def get(self, user_id):
start_time = time.time()
calling_user = authenticate(request, self.state)
u = self.state.get_one('user', id=user_id)
......@@ -45,10 +49,12 @@ class UserAPI(Resource):
is_authorized(calling_user, u, 'get')
d = u.to_dict(checkpoint=False)
singletons['metric'].metric_api_call(start_time, 'user', 'get', calling_user)
return d, 200
@catch_exceptions
def delete(self, user_id):
start_time = time.time()
calling_user = authenticate(request, self.state)
user = self.state.get_one('user', id=user_id)
......@@ -67,6 +73,7 @@ class UserAPI(Resource):
self.state.delete('user', user_id)
self.state.state_updated()
singletons['metric'].metric_api_call(start_time, 'user', 'delete', calling_user)
return '', 204
......@@ -85,6 +92,7 @@ class UserCollectionAPI(Resource):
Create a new user
:return:
"""
start_time = time.time()
calling_user = authenticate(request, self.state)
try:
......@@ -114,4 +122,5 @@ class UserCollectionAPI(Resource):
self.state.state_updated()
singletons['metric'].metric_api_call(start_time, 'user', 'post', calling_user)
return {"user_id": user.id}, 201
......@@ -29,6 +29,7 @@ class TestConf:
self.state_dir = '/tmp/zoe'
self.zoeadmin_password = 'test'
self.passlib_rounds = 1
self.enable_influxdb = False
@pytest.fixture(scope='session')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment