Commit 7f36256b authored by Daniele Venzano's avatar Daniele Venzano 🏇

Merge branch 'devel/internal-logs' into 'master'

Internal logs

See merge request !8
parents 3ec68d30 cbe30dd4
......@@ -11,7 +11,7 @@ Resources:
- Documentation: http://docs.zoe-analytics.eu
- Roadmap: https://gitlab.eurecom.fr/zoe/main/wikis/home
- Mailing list: http://www.freelists.org/list/zoe
- Issue tracker: https://gitlab.eurecom.fr/zoe/main/issues
- Issue tracker: https://github.com/DistributedSystemsGroup/zoe/issues
- Stable releases: https://github.com/DistributedSystemsGroup/zoe
Zoe applications (ZApps):
......
......@@ -17,14 +17,24 @@ Common options:
* ``debug = <true|false>`` : enable or disable debug log output
* ``api-listen-uri = tcp://*:4850`` : ZeroMQ server connection string, used for the master listening endpoint
* ``deployment-name = devel`` : name of this Zoe deployment. Can be used to have multiple Zoe deployments using the same Swarm (devel and prod, for example)
* ``deployment-name = devel`` : name of this Zoe deployment. Can be used to have multiple Zoe deployments using the same back-end (devel and prod, for example)
Workspaces:
* ``workspace-deployment-path`` : path appended to the ``workspace-base-path`` to distinguish this deployment. If left unspecified it is equal to the deployment name
* ``workspace-base-path = /mnt/zoe-workspaces`` : Base directory where user workspaces will be created. This directory should reside on a shared filesystem visible by all Docker hosts.
Metrics:
* ``influxdb-dbname = zoe`` : Name of the InfluxDB database to use for storing metrics
* ``influxdb-url = http://localhost:8086`` : URL of the InfluxDB service (ex. )
* ``influxdb-enable = False`` : Enable metric output toward influxDB
* ``workspace-base-path = /mnt/zoe-workspaces`` : Base directory where user workspaces will be created. This directory should reside on a shared filesystem visible by all Docker hosts.
* ``overlay-network-name = zoe`` : name of the pre-configured Docker overlay network Zoe should use (Swarm backend)
* ``backend = Swarm`` : ' Name of the backend to enable and use
Service logs:
* ``gelf-address = 7896``, help='Enable Docker GELF log output to this destination (ex. udp://1.2.3.4:7896)', default='')
argparser.add_argument('--gelf-listener', type=int, help='Enable the internal GELF log listener on this port, set to 0 to disable', default='7896')
argparser.add_argument('--service-logs-base-path',
PostgresQL database options:
......@@ -65,6 +75,7 @@ Swarm backend options:
* ``backend-swarm-tls-cert = cert.pem`` : Docker TLS certificate file
* ``backend-swarm-tls-key = key.pem`` : Docker TLS private key file
* ``backend-swarm-tls-ca = ca.pem`` : Docker TLS CA certificate file
* ``overlay-network-name = zoe`` : name of the pre-configured Docker overlay network Zoe should use (Swarm backend)
Kubernetes backend:
......
.. _logging:
Container logs
==============
Zoe logs and service output
===========================
By design Zoe does not involve itself with the output from container processes. The logs can be retrieved with the usual Docker command ``docker logs`` while a container is alive, they are lost forever when the container is deleted. This solution however does not scale very well: to examine logs, users need to have access to the docker commandline tools and to the Swarm they are running in.
Zoe daemons outputs log information on the standard error stream. More verbose output can be enabled by setting the option ``debug`` to ``true``.
In production we recommend to configure your backend to manage the logs according to your policies. Docker Engines, for example, can be configured to send standard output and error to a remote destination in GELF format (others are supported), as soon as they are generated.
The command-line option ``--log-file`` can be used to specify a file where the output should be written.
A popular logging stack that supports GELF is `ELK <https://www.elastic.co/products>`_. However, in our experience, web interfaces like Kibana or Graylog are not useful to the Zoe users: they want to quickly dig through logs of their executions to find an error or an interesting number to correlate to some other number in some other log. The web interfaces are slow and cluttered compared to using grep on a text file.
Which alternative is good for you depends on the usage pattern of your users, your log auditing requirements, etc.
Service logs
------------
In this section we focus on the output produced by the ZApps and their services.
Companies and users have a wide variety of requirements for this kind of output:
* It may need to be stored for auditing or research
* Users need to access it for debugging or to check progress of their executions
* ZApps may generate a lot of output in a very short time: it may become a lot of data moving around
Because of this in Zoe we decided to leave the maximum freedom to administrators deploying Zoe. By default Zoe does not configure the container back-ends to do anything special with the output of containers, so whatever is configured there is respected by Zoe.
In this case the logs command line, API and web interface will not be operational.
Swarm-only integrated log management
------------------------------------
When using the Swarm back-end, however, Zoe can configure the containers to produce the output in UDP GELF format and send them to a configured destination, via the ``gelf-address`` option. Each messages is enriched with labels to help matching each log line to the ZApp and service that produced it.
GELF is understood by many tools, like Graylog or the `ELK <https://www.elastic.co/products>`_ and it is possible to store the service output in Elasticsearch and make it searchable via Kibana, for example.
Additionally the Zoe master can itself be configured to act as a log collector. This is enabled by setting the option ``gelf-listener`` to the port number specified in ``gelf-address``. In this case the Zoe Master will activate a thread that listens on that UDP port. Logs will be stored in files, in a directory hierarchy built as follows::
<service-logs-base-path>/<deployment-name>/<execution-id>/<service-name>.txt
In this case the logs command line, API and web interface will work normally.
Please note that the GELF listener implemented in the Zoe Master process is not built to manage high loads of incoming log messages. If the incoming rate is too high, UDP packets (and hence log lines) may be dropped and lost.
......@@ -18,6 +18,7 @@
from datetime import datetime, timedelta
import logging
import re
import os
import zoe_api.exceptions
import zoe_api.master_api
......@@ -25,7 +26,6 @@ import zoe_lib.applications
import zoe_lib.exceptions
import zoe_lib.state
from zoe_lib.config import get_conf
from zoe_master.backends.swarm.api_client import SwarmClient
log = logging.getLogger(__name__)
......@@ -134,16 +134,22 @@ class APIEndpoint:
return ret
def service_logs(self, uid, role, service_id, stream=True):
"""Retrieve the logs for the given service."""
"""Retrieve the logs for the given service.
If stream is True, a file object is returned, otherwise the log contents as a str object.
"""
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such service')
if service.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if service.docker_id is None:
raise zoe_api.exceptions.ZoeNotFoundException('Container is not running')
swarm = SwarmClient()
return swarm.logs(service.docker_id, stream)
path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
if not os.path.exists(path):
raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
if not stream:
return open(path, encoding='utf-8').read()
else:
return open(path, encoding='utf-8')
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
......
......@@ -15,11 +15,9 @@
"""The Service API endpoint."""
from concurrent.futures import ThreadPoolExecutor
import logging
from tornado.web import RequestHandler
import tornado.gen
from tornado.web import RequestHandler, asynchronous
import tornado.iostream
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
......@@ -27,8 +25,6 @@ from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
log = logging.getLogger(__name__)
THREAD_POOL = ThreadPoolExecutor(20)
class ServiceAPI(RequestHandler):
"""The Service API endpoint."""
......@@ -68,6 +64,9 @@ class ServiceLogsAPI(RequestHandler):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.connection_closed = False
self.service_id = None
self.stream = None
self.log_obj = None
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
......@@ -81,35 +80,25 @@ class ServiceLogsAPI(RequestHandler):
def on_connection_close(self):
"""Tornado callback for clients closing the connection."""
self.connection_closed = True
log.debug('Finished log stream for service {}'.format(self.service_id))
self.finish()
@catch_exceptions
@tornado.gen.coroutine
@asynchronous
def get(self, service_id):
"""HTTP GET method."""
uid, role = get_auth(self)
log_gen = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
while True:
try:
log_line = yield THREAD_POOL.submit(next, log_gen)
except StopIteration:
break
self.write(log_line)
self.service_id = service_id
self.log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
self.stream = tornado.iostream.PipeIOStream(self.log_obj.fileno())
self.stream.read_until(b'\n', callback=self._stream_log_line)
try:
yield self.flush()
except tornado.iostream.StreamClosedError:
break
if self.connection_closed:
break
log.debug('Finished log stream for service {}'.format(service_id))
self.finish()
def _stream_log_line(self, log_line):
self.write(log_line)
self.flush()
self.stream.read_until(b'\n', callback=self._stream_log_line)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
......
......@@ -20,7 +20,7 @@ from typing import List
import tornado.web
import zoe_api.web.start
import zoe_api.web.ajax
import zoe_api.web.websockets
import zoe_api.web.executions
from zoe_lib.version import ZOE_API_VERSION, ZOE_VERSION
......@@ -43,8 +43,9 @@ def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, route_args, name='execution_terminate'),
tornado.web.url(r'/executions/delete/([0-9]+)', zoe_api.web.executions.ExecutionDeleteWeb, route_args, name='execution_delete'),
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, route_args, name='execution_inspect'),
tornado.web.url(r'/service/logs/([0-9]+)', zoe_api.web.executions.ServiceLogsWeb, route_args, name='service_logs'),
tornado.web.url(r'/ajax', zoe_api.web.ajax.AjaxEndpointWeb, route_args, name='ajax')
tornado.web.url(r'/websocket', zoe_api.web.websockets.WebSocketEndpointWeb, route_args, name='websocket')
]
return web_routes
......
......@@ -109,7 +109,8 @@ class ZoeRequestHandler(tornado.web.RequestHandler):
'datetime': datetime,
'locale': self.locale,
'handler': self,
'zoe_version': zoe_lib.version.ZOE_VERSION
'zoe_version': zoe_lib.version.ZOE_VERSION,
'server_address': self.request.host
}
ctx.update(kwargs)
......
......@@ -151,3 +151,26 @@ class ExecutionInspectWeb(ZoeRequestHandler):
"endpoints": endpoints,
}
self.render('execution_inspect.html', **template_vars)
class ServiceLogsWeb(ZoeRequestHandler):
"""Handler class"""
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
@catch_exceptions
def get(self, service_id):
"""Gather details about an execution."""
uid, role = get_auth(self)
if uid is None:
return self.redirect(self.get_argument('next', u'/login'))
service = self.api_endpoint.service_by_id(uid, role, service_id)
#log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
template_vars = {
"service": service,
}
self.render('service_logs.html', **template_vars)
......@@ -161,3 +161,13 @@ section {
fieldset {
border: 0;
}
textarea.logoutput {
width: 100%;
height: 40em;
font-size: 0.7em;
font-family: "Lucida Console", Monaco, monospace;
white-space: pre;
overflow-x: auto;
overflow-y: auto;
}
......@@ -43,13 +43,14 @@
{% endif %}
<ul>
{% for s in services_info %}
<li class="container_name" id="{{ s['id'] }}">{{ s['name'] }}</li>
<li class="container_name">{{ s['name'] }}</li>
<ul>
<li>Zoe status: {{ s['status'] }}</li>
<li>Backend status: {{ s['backend_status'] }}</li>
{% if s['error_message'] is not none %}
<li>Error: {{ s['error_message'] }}</li>
{% endif %}
<li><a href="{{ reverse_url("service_logs", s['id']) }}">Log output</a></li>
</ul>
{% endfor %}
</ul>
......
{% extends "base_user.html" %}
{% block title %}Service {{ service.name }} logs{% endblock %}
{% block content %}
<h1>Zoe - Analytics on demand</h1>
<h2>{{ service.name }} console output</h2>
<textarea class="logoutput" id="logoutput" readonly>
</textarea>
<p><a href="{{ reverse_url("execution_inspect", service.execution_id) }}">Back to execution details</a></p>
<script type="application/javascript">
var ws = new WebSocket('ws://{{ server_address }}/websocket');
ws.onopen = function (e) {
ws.send(JSON.stringify({
command: "service_logs",
service_id: {{ service.id }}
}));
};
ws.onmessage = function (evt) {
$('#logoutput').append(evt.data);
};
</script>
{% endblock %}
......@@ -17,35 +17,58 @@
import datetime
import json
import logging
import tornado.websocket
import tornado.iostream
from tornado.web import asynchronous
from tornado.escape import json_decode
from zoe_lib.config import get_conf
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler
log = logging.getLogger(__name__)
class AjaxEndpointWeb(ZoeRequestHandler):
class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
"""Handler class"""
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
super().initialize()
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.uid = None
self.role = None
self.log_obj = None
self.stream = None
@catch_exceptions
def post(self):
"""AJAX POST requests."""
def open(self, *args, **kwargs):
"""Invoked when a new WebSocket is opened."""
log.debug('WebSocket opened')
uid, role = get_auth(self)
if uid is None:
self.close(401, "Unauthorized")
else:
self.uid = uid
self.role = role
@catch_exceptions
@asynchronous
def on_message(self, message):
"""WebSocket message handler."""
if message is None:
return
request = json_decode(self.request.body)
request = json.loads(message)
if request['type'] == 'start':
if request['command'] == 'start_zapp':
app_descr = json.load(open('contrib/zoeapps/eurecom_aml_lab.json', 'r'))
execution = self.api_endpoint.execution_list(uid, role, name='aml-lab')
execution = self.api_endpoint.execution_list(self.uid, self.role, name='aml-lab')
if len(execution) == 0:
exec_id = self.api_endpoint.execution_start(uid, role, 'aml-lab', app_descr)
exec_id = self.api_endpoint.execution_start(self.uid, self.role, 'aml-lab', app_descr)
else:
execution = execution[0]
exec_id = execution.id
......@@ -53,9 +76,10 @@ class AjaxEndpointWeb(ZoeRequestHandler):
'status': 'ok',
'execution_id': exec_id
}
elif request['type'] == 'query_status':
self.write_message(response)
elif request['command'] == 'query_status':
try:
execution = self.api_endpoint.execution_by_id(uid, role, request['exec_id'])
execution = self.api_endpoint.execution_by_id(self.uid, self.role, request['exec_id'])
except zoe_api.exceptions.ZoeNotFoundException:
response = {
'status': 'ok',
......@@ -68,14 +92,30 @@ class AjaxEndpointWeb(ZoeRequestHandler):
}
if execution.status == execution.RUNNING_STATUS:
response['ttl'] = ((execution.time_start + datetime.timedelta(hours=get_conf().aml_ttl)) - datetime.datetime.now()).total_seconds()
services_info_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
services_info_, endpoints = self.api_endpoint.execution_endpoints(self.uid, self.role, execution)
response['endpoints'] = endpoints
elif execution.status == execution.ERROR_STATUS or execution.status == execution.TERMINATED_STATUS:
self.api_endpoint.execution_delete(uid, role, execution.id)
self.api_endpoint.execution_delete(self.uid, self.role, execution.id)
self.write_message(response)
elif request['command'] == 'service_logs':
self.log_obj = self.api_endpoint.service_logs(self.uid, self.role, request['service_id'], stream=True)
self.stream = tornado.iostream.PipeIOStream(self.log_obj.fileno())
self.stream.read_until(b'\n', callback=self._stream_log_line)
else:
response = {
'status': 'error',
'message': 'unknown request type'
}
self.write_message(response)
def _stream_log_line(self, log_line):
self.write_message(log_line)
self.stream.read_until(b'\n', callback=self._stream_log_line)
def on_close(self):
"""Invoked when the WebSocket is closed."""
log.debug("WebSocket closed")
self.write(response)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -66,6 +66,11 @@ def load_configuration(test_conf=None):
argparser.add_argument('--workspace-deployment-path', help='Path appended to the workspace path to distinguish this deployment. If unspecified is equal to the deployment name.', default='--default--')
argparser.add_argument('--overlay-network-name', help='Name of the Swarm overlay network Zoe should use', default='zoe')
# Service logs
argparser.add_argument('--gelf-address', help='Enable Docker GELF log output to this destination (ex. udp://1.2.3.4:7896)', default='')
argparser.add_argument('--gelf-listener', type=int, help='Enable the internal GELF log listener on this port, set to 0 to disable', default='7896')
argparser.add_argument('--service-logs-base-path', help='Path where service logs coming from the GELF listern will be stored', default='/var/lib/zoe/service-logs')
# API options
argparser.add_argument('--listen-address', type=str, help='Address to listen to for incoming connections', default="0.0.0.0")
argparser.add_argument('--listen-port', type=int, help='Port to listen to for incoming connections', default=5001)
......
......@@ -181,6 +181,20 @@ class SwarmClient:
mem_limit = 0
# Swarm backend does not support cores in a consistent way, see https://github.com/docker/swarm/issues/475
if get_conf().gelf_address != '':
log_config = {
"type": "gelf",
"config": {
'gelf-address': get_conf().gelf_address,
'labels': ",".join(service_instance.labels)
}
}
else:
log_config = {
"type": "json-file",
"config": {}
}
try:
cont = self.cli.containers.run(image=service_instance.image_name,
command=service_instance.command,
......@@ -188,6 +202,7 @@ class SwarmClient:
environment=environment,
hostname=service_instance.hostname,
labels=service_instance.labels,
log_config=log_config,
mem_limit=mem_limit,
memswap_limit=0,
name=service_instance.name,
......
......@@ -30,6 +30,7 @@ import zoe_master.backends.interface
from zoe_master.preprocessing import restart_resubmit_scheduler
from zoe_master.master_api import APIManager
from zoe_master.exceptions import ZoeException
from zoe_master.gelf_listener import GELFListener
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
......@@ -83,6 +84,11 @@ def main():
log.info("Starting ZMQ API server...")
api_server = APIManager(metrics, scheduler, state)
if config.get_conf().gelf_listener != 0:
gelf_listener = GELFListener()
else:
gelf_listener = None
try:
api_server.loop()
except KeyboardInterrupt:
......@@ -94,3 +100,5 @@ def main():
api_server.quit()
zoe_master.backends.interface.shutdown_backend()
metrics.quit()
if gelf_listener is not None:
gelf_listener.quit()
#!/usr/bin/python3
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe GELF listener."""
import socketserver
import gzip
import json
import logging
import threading
import os
import datetime
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class GELFUDPHandler(socketserver.DatagramRequestHandler):
"""The handler for incoming UDP packets."""
def handle(self):
"""Handle one UDP packet (one GELF log line in JSON format)."""
data = self.rfile.read()
data = gzip.decompress(data)
data = json.loads(data.decode('utf-8'))
deployment_name = data['_zoe_deployment_name']
if deployment_name != get_conf().deployment_name:
return
execution_id = data['_zoe_execution_id']
service_name = data['_zoe_service_name']
host = data['host']
timestamp = datetime.datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
message = data['short_message']
log_file_path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id), service_name + '.txt')
if not os.path.exists(log_file_path):
os.makedirs(os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id)))
open(log_file_path, 'w').write('ZOE HEADER: log file for service {} running on host {}\n'.format(service_name, host))
with open(log_file_path, 'a') as logfile:
logfile.write(timestamp + ' ' + message + '\n')
class ZoeLoggerUDPServer(socketserver.UDPServer):
"""The UDP server"""
def __init__(self, server_address, handler_class):
self.allow_reuse_address = True
super().__init__(server_address, handler_class)
class GELFListener:
"""A thread that listens to UDP GELF and writes logs to a directory tree according to Zoe tags."""
def __init__(self):
self.server = ZoeLoggerUDPServer(("0.0.0.0", get_conf().gelf_listener), GELFUDPHandler)
self.th = None
self.start()
def start(self):
"""Starts the GELF thread."""
if self.th is not None:
return
log.info('GELF listener starting on {}:{}'.format("0.0.0.0", get_conf().gelf_listener))
self.th = threading.Thread(target=self.server.serve_forever, name='GELF server', daemon=True)
self.th.start()
def quit(self):
"""Stops the GELF server."""
self.server.shutdown()
self.th.join(0.1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment