Commit 5fd816f0 authored by Daniele Venzano's avatar Daniele Venzano

Move checking for dead services and execution in the zoe master, so that rescheduling can happen

parent 9f0a384b
......@@ -168,19 +168,6 @@ class APIEndpoint:
else:
raise zoe_api.exceptions.ZoeException(message=message)
def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services."""
log.debug('Starting dead execution cleanup task')
all_execs = self.sql.executions.select(status='running')
for execution in all_execs:
for service in execution.services:
if service.description['monitor'] and service.backend_status == service.BACKEND_DIE_STATUS:
log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
self.master.execution_terminate(execution.id)
break
log.debug('Cleanup task finished')
def execution_endpoints(self, uid: str, role: str, execution: zoe_lib.state.Execution):
"""Return a list of the services and public endpoints available for a certain execution."""
services_info = []
......
......@@ -77,9 +77,6 @@ def zoe_web_main(test_conf=None) -> int:
http_server.bind(args.listen_port, args.listen_address)
http_server.start(num_processes=1)
retry_cb = PeriodicCallback(api_endpoint.cleanup_dead_executions, 60000)
retry_cb.start()
try:
IOLoop.current().start()
except KeyboardInterrupt:
......
......@@ -118,6 +118,7 @@ class Service(BaseRecord):
self.backend_id = d['backend_id']
self.backend_status = d['backend_status']
self.backend_host = d['backend_host']
self.restart_count = d['restart_count']
self.ip_address = d['ip_address']
if self.ip_address is not None and ('/32' in self.ip_address or '/128' in self.ip_address):
......@@ -160,7 +161,8 @@ class Service(BaseRecord):
'backend_status': self.backend_status,
'backend_host': self.backend_host,
'essential': self.essential,
'proxy_address': self.proxy_address
'proxy_address': self.proxy_address,
'restart_count': self.restart_count
}
def __eq__(self, other):
......@@ -266,6 +268,11 @@ class Service(BaseRecord):
"""Return the parent execution."""
return self.sql_manager.executions.select(only_one=True, id=self.execution_id)
def restarted(self):
"""The service has restarted, keep track in the database."""
self.restart_count += 1
self.sql_manager.services.update(self.id, restart_count=self.restart_count)
class ServiceTable(BaseTable):
"""Abstraction for the service table in the database."""
......@@ -286,7 +293,8 @@ class ServiceTable(BaseTable):
backend_status TEXT NOT NULL DEFAULT 'undefined',
backend_host TEXT NULL DEFAULT NULL,
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
essential BOOLEAN NOT NULL DEFAULT FALSE,
restart_count INT DEFAULT 0
)''')
def insert(self, execution_id, name, service_group, description, is_essential):
......
......@@ -18,4 +18,4 @@
ZOE_VERSION = '2017.12-beta'
ZOE_API_VERSION = '0.7'
ZOE_APPLICATION_FORMAT_VERSION = 3
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the SQL schema changes !!! <---
SQL_SCHEMA_VERSION = 6 # ---> Increment this value every time the SQL schema changes !!! <---
......@@ -157,12 +157,12 @@ class ZoeElasticScheduler:
def _pop_all_with_same_size(self):
out_list = []
while len(self.queue) > 0:
job = self.queue.pop(0) # type: Execution
ret = job.termination_lock.acquire(blocking=False)
if ret and job.status != Execution.TERMINATED_STATUS:
out_list.append(job)
execution = self.queue.pop(0) # type: Execution
ret = execution.termination_lock.acquire(blocking=False)
if ret and execution.status != Execution.TERMINATED_STATUS:
out_list.append(execution)
else:
log.debug('While popping, throwing away execution {} that has the termination lock held'.format(job.id))
log.debug('While popping, throwing away execution {} that has the termination lock held'.format(execution.id))
return out_list
......@@ -185,6 +185,27 @@ class ZoeElasticScheduler:
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
self.core_limit_recalc_trigger.set()
# Check for executions that are no longer viable since an essential service died
for execution in self.queue_running:
for service in execution.services:
if service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
log.info("Essential service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
service.restarted()
execution.set_cleaning_up()
self.terminate(execution)
break
# Check for executions that need to be rescheduled because one of the elastic components died
# Do it in two loops to prevent rescheduling executions that need to be terminated
for execution in self.queue_running:
for service in execution.services:
if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
service.restarted()
self.queue_running.remove(execution)
self.queue.append(execution)
break
continue
log.debug("Scheduler loop has been triggered")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment