Commit 0a275427 authored by Daniele Venzano's avatar Daniele Venzano 🏇

Merge branch 'devel/fixes' into 'master'

Make the Zoe master more resistant to Swarm being offline

See merge request !23
parents 6d1da377 183790cf
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
"""The real API, exposed as web pages or REST API.""" """The real API, exposed as web pages or REST API."""
from datetime import datetime, timedelta
import logging import logging
import os import os
...@@ -175,21 +174,13 @@ class APIEndpoint: ...@@ -175,21 +174,13 @@ class APIEndpoint:
def cleanup_dead_executions(self): def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services.""" """Terminates all executions with dead "monitor" services."""
log.debug('Starting dead execution cleanup task') log.debug('Starting dead execution cleanup task')
all_execs = self.sql.execution_list() all_execs = self.sql.execution_list(status='running')
for execution in all_execs: for execution in all_execs:
if execution.is_running: for service in execution.services:
terminated = False if service.description['monitor'] and service.status == service.BACKEND_DIE_STATUS:
for service in execution.services: log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
if service.description['monitor'] and service.is_dead(): self.master.execution_terminate(execution.id)
log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id)) break
self.master.execution_terminate(execution.id)
terminated = True
break
if not terminated and execution.name == "aml-lab":
log.debug('Looking at AML execution {}...'.format(execution.id))
if datetime.now() - execution.time_start > timedelta(hours=get_conf().aml_ttl):
log.info('Terminating AML-LAB execution for user {}, timer expired'.format(execution.user_id))
self.master.execution_terminate(execution.id)
log.debug('Cleanup task finished') log.debug('Cleanup task finished')
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth, catch_exceptions from zoe_api.web.utils import get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler from zoe_api.web.custom_request_handler import ZoeRequestHandler
from zoe_api.exceptions import ZoeException
class StatusEndpointWeb(ZoeRequestHandler): class StatusEndpointWeb(ZoeRequestHandler):
...@@ -35,6 +36,8 @@ class StatusEndpointWeb(ZoeRequestHandler): ...@@ -35,6 +36,8 @@ class StatusEndpointWeb(ZoeRequestHandler):
return self.redirect(self.get_argument('next', u'/login')) return self.redirect(self.get_argument('next', u'/login'))
stats = self.api_endpoint.statistics_scheduler(uid, role) stats = self.api_endpoint.statistics_scheduler(uid, role)
if stats is None:
raise ZoeException('Cannot retrieve statistics from the Zoe master')
executions_in_queue = {} executions_in_queue = {}
for exec_id in stats['queue']: for exec_id in stats['queue']:
......
...@@ -57,7 +57,12 @@ class SwarmStateSynchronizer(threading.Thread): ...@@ -57,7 +57,12 @@ class SwarmStateSynchronizer(threading.Thread):
"""The thread loop.""" """The thread loop."""
log.info("Checker thread started") log.info("Checker thread started")
while not self.stop: while not self.stop:
swarm = SwarmClient() try:
swarm = SwarmClient()
except ZoeException as e:
log.error(str(e))
time.sleep(CHECK_INTERVAL)
continue
service_list = self.state.service_list() service_list = self.state.service_list()
try: try:
container_list = swarm.list(only_label={'zoe_deployment_name': get_conf().deployment_name}) container_list = swarm.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
......
...@@ -87,8 +87,13 @@ class APIManager: ...@@ -87,8 +87,13 @@ class APIManager:
zoe_master.preprocessing.execution_delete(execution) zoe_master.preprocessing.execution_delete(execution)
self._reply_ok() self._reply_ok()
elif message['command'] == 'scheduler_stats': elif message['command'] == 'scheduler_stats':
data = self.scheduler.stats() try:
self._reply_ok(data=data) data = self.scheduler.stats()
except ZoeException as e:
log.error(str(e))
self._reply_error(str(e))
else:
self._reply_ok(data=data)
else: else:
log.error('Unknown command: {}'.format(message['command'])) log.error('Unknown command: {}'.format(message['command']))
self._reply_error('unknown command') self._reply_error('unknown command')
......
...@@ -24,6 +24,7 @@ import threading ...@@ -24,6 +24,7 @@ import threading
import time import time
from zoe_lib.state import Execution, SQLManager from zoe_lib.state import Execution, SQLManager
from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential
from zoe_master.scheduler.simulated_platform import SimulatedPlatform from zoe_master.scheduler.simulated_platform import SimulatedPlatform
...@@ -80,7 +81,11 @@ class ZoeElasticScheduler: ...@@ -80,7 +81,11 @@ class ZoeElasticScheduler:
def async_termination(e): def async_termination(e):
"""Actual termination runs in a thread.""" """Actual termination runs in a thread."""
with e.termination_lock: with e.termination_lock:
terminate_execution(e) try:
terminate_execution(e)
except ZoeException as ex:
log.error('Error in termination thread: {}'.format(ex))
return
self.trigger() self.trigger()
log.debug('Execution {} terminated successfully'.format(e.id)) log.debug('Execution {} terminated successfully'.format(e.id))
...@@ -184,7 +189,15 @@ class ZoeElasticScheduler: ...@@ -184,7 +189,15 @@ class ZoeElasticScheduler:
for job in jobs_to_attempt_scheduling: for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job)) log.debug("-> {}".format(job))
platform_state = get_platform_state() try:
platform_state = get_platform_state()
except ZoeException:
log.error('Cannot retrieve platform state, cannot schedule')
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
self.queue = jobs_to_attempt_scheduling + self.queue
break
cluster_status_snapshot = SimulatedPlatform(platform_state) cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot)) log.debug(str(cluster_status_snapshot))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment