Commit eba363c8 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Various fixes

- remove executions with fatal errors form the scheduling queue
- set services to error state when they fatally fail starting
- re-implement log streaming that was not working
parent eb4a6714
......@@ -17,8 +17,8 @@
import logging
from tornado.web import RequestHandler, asynchronous
import tornado.iostream
from tornado.web import RequestHandler
import tornado.gen
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
......@@ -84,7 +84,6 @@ class ServiceLogsAPI(RequestHandler):
self.finish()
@catch_exceptions
@asynchronous
def get(self, service_id):
"""HTTP GET method."""
......@@ -92,13 +91,17 @@ class ServiceLogsAPI(RequestHandler):
self.service_id = service_id
self.log_obj = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
self.stream = tornado.iostream.PipeIOStream(self.log_obj.fileno())
self.stream.read_until(b'\n', callback=self._stream_log_line)
def _stream_log_line(self, log_line):
self.write(log_line)
self.flush()
self.stream.read_until(b'\n', callback=self._stream_log_line)
self._stream_log_line()
@tornado.gen.coroutine
def _stream_log_line(self):
while True:
line = self.log_obj.read(4096)
if line is not None:
self.write(line)
self.flush()
else:
yield tornado.gen.sleep(0.2)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
......
......@@ -97,6 +97,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
return "requeue"
except ZoeStartExecutionFatalException as ex:
log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message)
execution.set_error_message(ex.message)
terminate_execution(execution)
execution.set_error()
......
......@@ -207,6 +207,7 @@ class ZoeElasticScheduler:
if not job.essential_services_running:
ret = start_essential(job)
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment