Commit e79aecb9 authored by Daniele Venzano's avatar Daniele Venzano

Include an option to manage logs internally

If activated, docker will be configured to send service logs to the zoe master, who will save them to disk. Then they can be streamed via the command line or the web interface.
parent 3ec68d30
......@@ -18,6 +18,7 @@
from datetime import datetime, timedelta
import logging
import re
import os
import zoe_api.exceptions
import zoe_api.master_api
......@@ -134,16 +135,22 @@ class APIEndpoint:
return ret
def service_logs(self, uid, role, service_id, stream=True):
"""Retrieve the logs for the given service."""
"""Retrieve the logs for the given service.
If stream is True, a file object is returned, otherwise the log contents as a str object.
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such service')
if service.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if service.docker_id is None:
raise zoe_api.exceptions.ZoeNotFoundException('Container is not running')
swarm = SwarmClient()
return swarm.logs(service.docker_id, stream)
path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), + '.txt')
if not os.path.exists(path):
raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
if not stream:
return open(path, encoding='utf-8').read()
return open(path, encoding='utf-8')
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
......@@ -15,11 +15,9 @@
"""The Service API endpoint."""
from concurrent.futures import ThreadPoolExecutor
import logging
from tornado.web import RequestHandler
import tornado.gen
from tornado.web import RequestHandler, asynchronous
import tornado.iostream
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
......@@ -27,8 +25,6 @@ from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
log = logging.getLogger(__name__)
THREAD_POOL = ThreadPoolExecutor(20)
class ServiceAPI(RequestHandler):
"""The Service API endpoint."""
......@@ -81,35 +77,25 @@ class ServiceLogsAPI(RequestHandler):
def on_connection_close(self):
"""Tornado callback for clients closing the connection."""
self.connection_closed = True
log.debug('Finished log stream for service {}'.format(self.service_id))
def get(self, service_id):
"""HTTP GET method."""
uid, role = get_auth(self)
log_gen = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
while True:
log_line = yield THREAD_POOL.submit(next, log_gen)
except StopIteration:
self.service_id = service_id
self.log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True) = tornado.iostream.PipeIOStream(self.log_obj.fileno())'\n', callback=self._stream_log_line)
yield self.flush()
except tornado.iostream.StreamClosedError:
if self.connection_closed:
log.debug('Finished log stream for service {}'.format(service_id))
def _stream_log_line(self, log_line):
self.flush()'\n', callback=self._stream_log_line)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
......@@ -20,7 +20,7 @@ from typing import List
import tornado.web
import zoe_api.web.start
import zoe_api.web.ajax
import zoe_api.web.websockets
import zoe_api.web.executions
from zoe_lib.version import ZOE_API_VERSION, ZOE_VERSION
......@@ -43,8 +43,9 @@ def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, route_args, name='execution_terminate'),
tornado.web.url(r'/executions/delete/([0-9]+)', zoe_api.web.executions.ExecutionDeleteWeb, route_args, name='execution_delete'),
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, route_args, name='execution_inspect'),
tornado.web.url(r'/service/logs/([0-9]+)', zoe_api.web.executions.ServiceLogsWeb, route_args, name='service_logs'),
tornado.web.url(r'/ajax', zoe_api.web.ajax.AjaxEndpointWeb, route_args, name='ajax')
tornado.web.url(r'/websocket', zoe_api.web.websockets.WebSocketEndpointWeb, route_args, name='websocket')
return web_routes
......@@ -109,7 +109,8 @@ class ZoeRequestHandler(tornado.web.RequestHandler):
'datetime': datetime,
'locale': self.locale,
'handler': self,
'zoe_version': zoe_lib.version.ZOE_VERSION
'zoe_version': zoe_lib.version.ZOE_VERSION,
......@@ -151,3 +151,26 @@ class ExecutionInspectWeb(ZoeRequestHandler):
"endpoints": endpoints,
self.render('execution_inspect.html', **template_vars)
class ServiceLogsWeb(ZoeRequestHandler):
"""Handler class"""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def get(self, service_id):
"""Gather details about an execution."""
uid, role = get_auth(self)
if uid is None:
return self.redirect(self.get_argument('next', u'/login'))
service = self.api_endpoint.service_by_id(uid, role, service_id)
#log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
template_vars = {
"service": service,
self.render('service_logs.html', **template_vars)
......@@ -161,3 +161,13 @@ section {
fieldset {
border: 0;
textarea.logoutput {
width: 100%;
height: 40em;
font-size: 0.7em;
font-family: "Lucida Console", Monaco, monospace;
white-space: pre;
overflow-x: auto;
overflow-y: auto;
......@@ -43,13 +43,14 @@
{% endif %}
{% for s in services_info %}
<li class="container_name" id="{{ s['id'] }}">{{ s['name'] }}</li>
<li class="container_name">{{ s['name'] }}</li>
<li>Zoe status: {{ s['status'] }}</li>
<li>Backend status: {{ s['backend_status'] }}</li>
{% if s['error_message'] is not none %}
<li>Error: {{ s['error_message'] }}</li>
{% endif %}
<li><a href="{{ reverse_url("service_logs", s['id']) }}">Log output</a></li>
{% endfor %}
{% extends "base_user.html" %}
{% block title %}Service {{ }} logs{% endblock %}
{% block content %}
<h1>Zoe - Analytics on demand</h1>
<h2>{{ }} console output</h2>
<textarea class="logoutput" id="logoutput" readonly>
<p><a href="{{ reverse_url("execution_inspect", service.execution_id) }}">Back to execution details</a></p>
<script type="application/javascript">
var ws = new WebSocket('ws://{{ server_address }}/websocket');
ws.onopen = function (e) {
command: "service_logs",
service_id: {{ }}
ws.onmessage = function (evt) {
{% endblock %}
......@@ -17,35 +17,54 @@
import datetime
import json
import logging
import tornado.websocket
import tornado.iostream
from tornado.web import asynchronous
from tornado.escape import json_decode
from zoe_lib.config import get_conf
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler
log = logging.getLogger(__name__)
class AjaxEndpointWeb(ZoeRequestHandler):
class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
"""Handler class"""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def post(self):
"""AJAX POST requests."""
def open(self, *args, **kwargs):
"""Invoked when a new WebSocket is opened."""
log.debug('WebSocket opened')
uid, role = get_auth(self)
if uid is None:
self.close(401, "Unauthorized")
self.uid = uid
self.role = role
request = json_decode(self.request.body)
def on_message(self, message):
"""WebSocket message handler."""
if request['type'] == 'start':
if message is None:
request = json.loads(message)
if request['command'] == 'start_zapp':
app_descr = json.load(open('contrib/zoeapps/eurecom_aml_lab.json', 'r'))
execution = self.api_endpoint.execution_list(uid, role, name='aml-lab')
execution = self.api_endpoint.execution_list(self.uid, self.role, name='aml-lab')
if len(execution) == 0:
exec_id = self.api_endpoint.execution_start(uid, role, 'aml-lab', app_descr)
exec_id = self.api_endpoint.execution_start(self.uid, self.role, 'aml-lab', app_descr)
execution = execution[0]
exec_id =
......@@ -53,9 +72,10 @@ class AjaxEndpointWeb(ZoeRequestHandler):
'status': 'ok',
'execution_id': exec_id
elif request['type'] == 'query_status':
elif request['command'] == 'query_status':
execution = self.api_endpoint.execution_by_id(uid, role, request['exec_id'])
execution = self.api_endpoint.execution_by_id(self.uid, self.role, request['exec_id'])
except zoe_api.exceptions.ZoeNotFoundException:
response = {
'status': 'ok',
......@@ -68,14 +88,26 @@ class AjaxEndpointWeb(ZoeRequestHandler):
if execution.status == execution.RUNNING_STATUS:
response['ttl'] = ((execution.time_start + datetime.timedelta(hours=get_conf().aml_ttl)) -
services_info_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
services_info_, endpoints = self.api_endpoint.execution_endpoints(self.uid, self.role, execution)
response['endpoints'] = endpoints
elif execution.status == execution.ERROR_STATUS or execution.status == execution.TERMINATED_STATUS:
self.api_endpoint.execution_delete(uid, role,
self.api_endpoint.execution_delete(self.uid, self.role,
elif request['command'] == 'service_logs':
self.log_obj = self.api_endpoint.service_logs(self.uid, self.role, request['service_id'], stream=True) = tornado.iostream.PipeIOStream(self.log_obj.fileno())'\n', callback=self._stream_log_line)
response = {
'status': 'error',
'message': 'unknown request type'
def _stream_log_line(self, log_line):
self.write_message(log_line)'\n', callback=self._stream_log_line)
def on_close(self):
"""Invoked when the WebSocket is closed."""
log.debug("WebSocket closed")
......@@ -66,6 +66,11 @@ def load_configuration(test_conf=None):
argparser.add_argument('--workspace-deployment-path', help='Path appended to the workspace path to distinguish this deployment. If unspecified is equal to the deployment name.', default='--default--')
argparser.add_argument('--overlay-network-name', help='Name of the Swarm overlay network Zoe should use', default='zoe')
# Service logs
argparser.add_argument('--gelf-address', help='Enable Docker GELF log output to this destination (ex. udp://', default='')
argparser.add_argument('--gelf-listener', type=int, help='Enable the internal GELF log listener on this port, set to 0 to disable', default='7896')
argparser.add_argument('--service-logs-base-path', help='Path where service logs coming from the GELF listern will be stored', default='/var/lib/zoe/service-logs')
# API options
argparser.add_argument('--listen-address', type=str, help='Address to listen to for incoming connections', default="")
argparser.add_argument('--listen-port', type=int, help='Port to listen to for incoming connections', default=5001)
......@@ -181,6 +181,13 @@ class SwarmClient:
mem_limit = 0
# Swarm backend does not support cores in a consistent way, see
log_config = {
"type": "gelf",
"config": {
'gelf-address': get_conf().gelf_address,
'labels': ",".join(service_instance.labels)
cont =,
......@@ -188,6 +195,7 @@ class SwarmClient:
......@@ -30,6 +30,7 @@ import zoe_master.backends.interface
from zoe_master.preprocessing import restart_resubmit_scheduler
from zoe_master.master_api import APIManager
from zoe_master.exceptions import ZoeException
from zoe_master.gelf_listener import GELFListener
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
......@@ -83,6 +84,11 @@ def main():"Starting ZMQ API server...")
api_server = APIManager(metrics, scheduler, state)
if config.get_conf().gelf_listener != 0:
gelf_listener = GELFListener()
gelf_listener = None
except KeyboardInterrupt:
......@@ -94,3 +100,6 @@ def main():
if gelf_listener is not None:
# Copyright (c) 2016, Daniele Venzano
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe GELF listener."""
import socketserver
import gzip
import json
import logging
import threading
import os
import datetime
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class GELFUDPHandler(socketserver.DatagramRequestHandler):
"""The handler for incoming UDP packets."""
def handle(self):
"""Handle one UDP packet (one GELF log line in JSON format)."""
data =
data = gzip.decompress(data)
data = json.loads(data.decode('utf-8'))
deployment_name = data['_zoe_deployment_name']
if deployment_name != get_conf().deployment_name:
execution_id = data['_zoe_execution_id']
service_name = data['_zoe_service_name']
host = data['host']
timestamp = datetime.datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
message = data['short_message']
log_file_path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id), service_name + '.txt')
if not os.path.exists(log_file_path):
os.makedirs(os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(execution_id)))
open(log_file_path, 'w').write('ZOE HEADER: log file for service {} running on host {}\n'.format(service_name, host))
with open(log_file_path, 'a') as logfile:
logfile.write(timestamp + ' ' + message + '\n')
class ZoeLoggerUDPServer(socketserver.UDPServer):
"""The UDP server"""
def __init__(self, server_address, handler_class):
self.allow_reuse_address = True
super().__init__(server_address, handler_class)
class GELFListener:
"""A thread that listens to UDP GELF and writes logs to a directory tree according to Zoe tags."""
def __init__(self):
self.server = ZoeLoggerUDPServer(("", get_conf().gelf_listener), GELFUDPHandler) = None
def start(self):
"""Starts the GELF thread."""
if is not None:
return'GELF listener starting on {}:{}'.format("", get_conf().gelf_listener)) = threading.Thread(target=self.server.serve_forever, name='GELF server', daemon=True)
def quit(self):
"""Stops the GELF server."""
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment