Commit 3c132ad4 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Implement an interface to stream logs out of docker

parent e280622c
......@@ -22,6 +22,7 @@ from zoe_lib.config import get_conf
import zoe_lib.sql_manager
import zoe_lib.applications
import zoe_lib.exceptions
from zoe_lib.swarm_client import SwarmClient
import zoe_api.master_api
import zoe_api.exceptions
......@@ -111,8 +112,7 @@ class APIEndpoint:
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
s_exec = self.sql.execution_list(only_one=True, id=service.execution_id)
if s_exec.user_id != uid and role != 'admin':
if service.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
return service
......@@ -122,6 +122,18 @@ class APIEndpoint:
ret = [s for s in services if s.user_id == uid or role == 'admin']
return ret
def service_logs(self, uid, role, service_id, stream=True):
"""Retrieve the logs for the given service."""
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
if service.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if service.docker_id is None:
raise zoe_api.exceptions.ZoeNotFoundException('Container is not running')
swarm = SwarmClient(get_conf())
return swarm.logs(service.docker_id, stream)
def retry_submit_error_executions(self):
"""Resubmit any execution forgotten by the master."""
waiting_execs = self.sql.execution_list(status=zoe_lib.sql_manager.Execution.SUBMIT_STATUS)
......@@ -23,7 +23,7 @@ from flask_restful import Api
from zoe_api.rest_api.execution import ExecutionAPI, ExecutionCollectionAPI, ExecutionDeleteAPI
from import InfoAPI
from zoe_api.rest_api.service import ServiceAPI
from zoe_api.rest_api.service import ServiceAPI, ServiceLogsAPI
from zoe_api.rest_api.discovery import DiscoveryAPI
from zoe_lib.version import ZOE_API_VERSION
......@@ -41,6 +41,7 @@ def api_init(api_endpoint) -> Blueprint:
api.add_resource(ExecutionDeleteAPI, API_PATH + '/execution/delete/<int:execution_id>', resource_class_kwargs={'api_endpoint': api_endpoint})
api.add_resource(ExecutionCollectionAPI, API_PATH + '/execution', resource_class_kwargs={'api_endpoint': api_endpoint})
api.add_resource(ServiceAPI, API_PATH + '/service/<int:service_id>', resource_class_kwargs={'api_endpoint': api_endpoint})
api.add_resource(ServiceLogsAPI, API_PATH + '/service/logs/<int:service_id>', resource_class_kwargs={'api_endpoint': api_endpoint})
api.add_resource(DiscoveryAPI, API_PATH + '/discovery/by_group/<int:execution_id>/<service_group>', resource_class_kwargs={'api_endpoint': api_endpoint})
......@@ -18,6 +18,7 @@
import logging
from flask_restful import Resource, request
from flask import Response
from zoe_api.rest_api.utils import catch_exceptions, get_auth
import zoe_api.api_endpoint
......@@ -38,3 +39,24 @@ class ServiceAPI(Resource):
service = self.api_endpoint.service_by_id(uid, role, service_id)
return service.serialize()
class ServiceLogsAPI(Resource):
"""The Service logs API endpoint."""
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint) -> None:
self.api_endpoint = api_endpoint
def get(self, service_id) -> dict:
"""HTTP GET method."""
uid, role = get_auth(request)
log_gen = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
def flask_stream():
"""Helper function to stream log data."""
for log_line in log_gen:
yield log_line.decode('utf-8')
return Response(flask_stream(), mimetype='text/plain')
......@@ -127,6 +127,13 @@ def exec_rm_cmd(args):
def logs_cmd(args):
"""Retrieves and streams the logs of a service."""
service_api = ZoeServiceAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
for line in service_api.get_logs(args.service_id):
ENV_HELP_TEXT = '''To use this tool you need also to define three environment variables:
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
......@@ -171,6 +178,10 @@ def process_arguments() -> Tuple[ArgumentParser, Namespace]:
argparser_execution_kill.add_argument('id', type=int, help="Execution id")
argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
argparser_logs.add_argument('service_id', type=int, help="Service id")
return parser, parser.parse_args()
......@@ -60,6 +60,24 @@ class ZoeAPIBase:
self.user = user
self.password = password
def _rest_get_stream(self, path):
:type path: str
:rtype: (dict, int)
url = self.url + '/api/' + ZOE_API_VERSION + path
req = requests.get(url, auth=(self.user, self.password), stream=True)
except requests.exceptions.Timeout:
raise ZoeAPIException('HTTP connection timeout')
except requests.exceptions.HTTPError:
raise ZoeAPIException('Invalid HTTP response')
except requests.exceptions.ConnectionError as e:
raise ZoeAPIException('Connection error: {}'.format(e))
return req, req.status_code
def _rest_get(self, path):
......@@ -45,3 +45,19 @@ class ZoeServiceAPI(ZoeAPIBase):
raise ZoeAPIException('service "{}" not found'.format(container_id))
raise ZoeAPIException('error retrieving service {}'.format(container_id))
def get_logs(self, container_id):
Retrieve service logs.
:param container_id:
response, status_code = self._rest_get_stream('/service/logs/' + str(container_id))
if status_code == 200:
for line in response.iter_lines():
yield line
elif status_code == 404:
raise ZoeAPIException('service "{}" not found'.format(container_id))
raise ZoeAPIException('error retrieving service {}'.format(container_id))
......@@ -347,3 +347,13 @@ class SwarmClient:
'status': cont_info['Status']
return conts
def logs(self, docker_id: str, stream: bool):
Retrieves the logs of the selected container.
:param docker_id:
:param stream:
return self.cli.logs(docker_id, stdout=True, stderr=True, stream=stream, timestamps=True)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment