Commit 2acb9dba authored by Daniele Venzano's avatar Daniele Venzano

Add a reason for execution termination, so the user knows what happened

parent 6bf8f471
......@@ -120,7 +120,7 @@ class APIEndpoint:
return new_id
def execution_terminate(self, user: zoe_lib.state.User, exec_id: int):
def execution_terminate(self, user: zoe_lib.state.User, exec_id: int, reason: str):
"""Terminate an execution."""
e = self.sql.executions.select(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.Execution)
......@@ -131,7 +131,7 @@ class APIEndpoint:
raise zoe_api.exceptions.ZoeAuthException('You are not authorized to terminate this execution')
if e.is_active:
success, message = self.master.execution_terminate(exec_id)
success, message = self.master.execution_terminate(exec_id, reason)
if not success:
raise zoe_api.exceptions.ZoeRestAPIException(message)
else:
......@@ -419,4 +419,4 @@ class APIEndpoint:
runtime_limit = timedelta(hours=runtime_limit)
if e.time_submit + runtime_limit < datetime.utcnow():
log.info('Automatically terminating execution {} that has exceeded the run time limit'.format(e.id))
self.execution_terminate(e.owner, e.id)
self.execution_terminate(e.owner, e.id, 'Run time quota exceeded')
......@@ -86,11 +86,12 @@ class APIManager:
}
return self._request_reply(msg)
def execution_terminate(self, exec_id: int) -> APIReturnType:
def execution_terminate(self, exec_id: int, reason: str) -> APIReturnType:
"""Terminate an execution."""
msg = {
'command': 'execution_terminate',
'exec_id': exec_id
'exec_id': exec_id,
'reason': reason
}
return self._request_reply(msg)
......
......@@ -47,7 +47,7 @@ class ExecutionAPI(ZoeAPIRequestHandler):
return
try:
self.api_endpoint.execution_terminate(self.current_user, execution_id)
self.api_endpoint.execution_terminate(self.current_user, execution_id, 'user {} request from API'.format(self.current_user))
except ZoeException as e:
self.set_status(e.status_code, e.message)
else:
......
......@@ -97,7 +97,7 @@ class ExecutionTerminateWeb(ZoeWebRequestHandler):
return
try:
self.api_endpoint.execution_terminate(self.current_user, execution_id)
self.api_endpoint.execution_terminate(self.current_user, execution_id, 'user {} request from web interface'.format(self.current_user))
except zoe_api.exceptions.ZoeException as e:
self.set_status(e.status_code, e.message)
return
......
......@@ -55,6 +55,9 @@
{% if e.status == 'error' %}
<p>Error message: <code>{{ e.error_message }}</code></p>
{% endif %}
{% if e.status == 'terminated' %}
<p>Termination reason: <code>{{ e.error_message }}</code></p>
{% endif %}
<div id="endpoints">
<h3>Endpoints</h3>
......
......@@ -119,11 +119,14 @@ class Execution(BaseRecord):
self._status = self.CLEANING_UP_STATUS
self.sql_manager.executions.update(self.id, status=self._status)
def set_terminated(self):
def set_terminated(self, reason=None):
"""The execution is not running."""
self._status = self.TERMINATED_STATUS
self.time_end = datetime.datetime.utcnow()
self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
if reason is not None:
self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end, error_message=reason)
else:
self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
def set_error(self):
"""The scheduler encountered an error starting or running the execution."""
......@@ -242,7 +245,6 @@ class ExecutionTable(BaseTable):
description JSON NOT NULL,
status TEXT NOT NULL,
size NUMERIC NOT NULL,
execution_manager_id TEXT NULL,
time_submit TIMESTAMP NOT NULL,
time_start TIMESTAMP NULL,
time_end TIMESTAMP NULL,
......
......@@ -96,22 +96,19 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message)
execution.set_error_message(ex.message)
terminate_execution(execution)
terminate_execution(execution, reason=ex.message)
execution.set_scheduled()
return "requeue"
except ZoeStartExecutionFatalException as ex:
log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message)
execution.set_error_message(ex.message)
terminate_execution(execution)
terminate_execution(execution, reason=ex.message)
execution.set_error()
return "fatal"
except Exception as ex:
log.error('Fatal error trying to start service {} of execution {}'.format(service.id, execution.id))
log.exception('BUG, this error should have been caught earlier')
execution.set_error_message(str(ex))
terminate_execution(execution)
terminate_execution(execution, reason=str(ex))
execution.set_error()
return "fatal"
else:
......@@ -169,11 +166,11 @@ def terminate_service(service: Service) -> None:
log.debug('Service {} terminated'.format(service.name))
def terminate_execution(execution: Execution) -> None:
def terminate_execution(execution: Execution, reason: Union[None, str]=None) -> None:
"""Terminate an execution."""
for service in execution.services: # type: Service
terminate_service(service)
execution.set_terminated()
execution.set_terminated(reason)
def get_platform_state() -> ClusterStats:
......
......@@ -71,12 +71,13 @@ class APIManager:
zoe_master.preprocessing.execution_submit(self.state, self.scheduler, execution)
elif message['command'] == 'execution_terminate':
exec_id = message['exec_id']
reason = message['reason']
execution = self.state.executions.select(id=exec_id, only_one=True)
if execution is None:
self._reply_error('Execution ID {} not found'.format(message['exec_id']))
else:
self._reply_ok()
zoe_master.preprocessing.execution_terminate(self.scheduler, execution)
zoe_master.preprocessing.execution_terminate(self.scheduler, execution, reason)
elif message['command'] == 'execution_delete':
exec_id = message['exec_id']
execution = self.state.executions.select(id=exec_id, only_one=True)
......
......@@ -88,15 +88,16 @@ def execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution:
scheduler.incoming(execution)
def execution_terminate(scheduler: ZoeBaseScheduler, execution: Execution):
"""Remove an execution form the scheduler."""
def execution_terminate(scheduler: ZoeBaseScheduler, execution: Execution, reason: str):
"""Remove an execution from the scheduler."""
if execution.is_running or execution.status == execution.SCHEDULED_STATUS:
execution.set_cleaning_up()
execution.set_error_message(reason)
scheduler.terminate(execution)
elif execution.status == execution.SUBMIT_STATUS or execution.status == execution.STARTING_STATUS:
return # It is unsafe to terminate executions in these statuses
elif execution.status == execution.ERROR_STATUS or execution.status == execution.CLEANING_UP_STATUS:
terminate_execution(execution)
terminate_execution(execution, reason)
elif execution.status == execution.TERMINATED_STATUS:
return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment