Commit 59472bab authored by Daniele Venzano's avatar Daniele Venzano

Log streaming was blocking tornado, fix using thread pools

parent eba363c8
......@@ -133,7 +133,7 @@ class APIEndpoint:
ret = [s for s in services if s.user_id == uid or role == 'admin']
return ret
def service_logs(self, uid, role, service_id, stream=True):
def service_logs(self, uid, role, service_id):
"""Retrieve the logs for the given service.
If stream is True, a file object is returned, otherwise the log contents as a str object.
"""
......@@ -146,10 +146,7 @@ class APIEndpoint:
path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
if not os.path.exists(path):
raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
if not stream:
return open(path, encoding='utf-8').read()
else:
return open(path, encoding='utf-8')
return open(path, encoding='utf-8')
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
......
......@@ -15,16 +15,20 @@
"""The Service API endpoint."""
from concurrent.futures import ThreadPoolExecutor
import logging
from tornado.web import RequestHandler
import tornado.gen
import tornado.iostream
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
log = logging.getLogger(__name__)
THREAD_POOL = ThreadPoolExecutor(20)
class ServiceAPI(RequestHandler):
"""The Service API endpoint."""
......@@ -81,27 +85,31 @@ class ServiceLogsAPI(RequestHandler):
def on_connection_close(self):
"""Tornado callback for clients closing the connection."""
log.debug('Finished log stream for service {}'.format(self.service_id))
self.connection_closed = True
self.finish()
@catch_exceptions
@tornado.gen.coroutine
def get(self, service_id):
"""HTTP GET method."""
uid, role = get_auth(self)
self.service_id = service_id
self.log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
self._stream_log_line()
log_obj = self.api_endpoint.service_logs(uid, role, service_id)
@tornado.gen.coroutine
def _stream_log_line(self):
while True:
line = self.log_obj.read(4096)
if line is not None:
self.write(line)
self.flush()
else:
while not self.connection_closed:
try:
log_line = yield THREAD_POOL.submit(next, log_obj)
except StopIteration:
yield tornado.gen.sleep(0.2)
continue
self.write(log_line)
try:
yield self.flush()
except tornado.iostream.StreamClosedError:
break
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
......
......@@ -19,9 +19,12 @@ ws.onopen = function (e) {
}));
};
var log_element = $('#logoutput');
ws.onmessage = function (evt) {
$('#logoutput').append(evt.data);
log_element.append(evt.data);
log_element.scrollTop(log_element[0].scrollHeight);
};
</script>
{% endblock %}
......@@ -18,11 +18,11 @@
import datetime
import json
import logging
from concurrent.futures import ThreadPoolExecutor
import tornado.websocket
import tornado.iostream
from tornado.web import asynchronous
import tornado.gen
from zoe_lib.config import get_conf
import zoe_api.exceptions
......@@ -31,6 +31,8 @@ from zoe_api.web.utils import get_auth, catch_exceptions
log = logging.getLogger(__name__)
THREAD_POOL = ThreadPoolExecutor(20)
class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
"""Handler class"""
......@@ -40,8 +42,7 @@ class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.uid = None
self.role = None
self.log_obj = None
self.stream = None
self.connection_closed = None
@catch_exceptions
def open(self, *args, **kwargs):
......@@ -55,7 +56,7 @@ class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
self.role = role
@catch_exceptions
@asynchronous
@tornado.gen.coroutine
def on_message(self, message):
"""WebSocket message handler."""
......@@ -96,11 +97,18 @@ class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
response['endpoints'] = endpoints
elif execution.status == execution.ERROR_STATUS or execution.status == execution.TERMINATED_STATUS:
self.api_endpoint.execution_delete(self.uid, self.role, execution.id)
self.write_message(response)
self.write_message(response)
elif request['command'] == 'service_logs':
self.log_obj = self.api_endpoint.service_logs(self.uid, self.role, request['service_id'], stream=True)
self.stream = tornado.iostream.PipeIOStream(self.log_obj.fileno())
self.stream.read_until(b'\n', callback=self._stream_log_line)
log_obj = self.api_endpoint.service_logs(self.uid, self.role, request['service_id'])
while not self.connection_closed:
try:
log_line = yield THREAD_POOL.submit(next, log_obj)
except StopIteration:
yield tornado.gen.sleep(0.2)
continue
self.write_message(log_line)
else:
response = {
'status': 'error',
......@@ -115,6 +123,7 @@ class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
def on_close(self):
"""Invoked when the WebSocket is closed."""
log.debug("WebSocket closed")
self.connection_closed = True
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment