Commit 83f0552a authored by Daniele Venzano's avatar Daniele Venzano

Force service state to be synchronous when starting and terminating services

parent 9c449c79
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
<a href="{{ reverse_url('execution_inspect', id) }}">{{ id }}</a> ({{ executions_in_queue[id].user_id }}) <a href="{{ reverse_url('execution_inspect', id) }}">{{ id }}</a> ({{ executions_in_queue[id].user_id }})
{% for service in executions_in_queue[id].services %} {% for service in executions_in_queue[id].services %}
{% if service.essential %} {% if service.essential %}
<div class="service essential {{ 'running' if service.status == service.ACTIVE_STATUS }}"> <div class="service essential {{ 'running' if not service.is_dead() }}">
{{ service['name'] }}<br/> {{ service['name'] }}<br/>
M: <script>format_bytes({{ service['resource_reservation']['memory']['max'] }});</script><br/> M: <script>format_bytes({{ service['resource_reservation']['memory']['max'] }});</script><br/>
C: {{ service['resource_reservation']['cores']['max'] }} C: {{ service['resource_reservation']['cores']['max'] }}
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
{% endfor %} {% endfor %}
{% for service in executions_in_queue[id].services %} {% for service in executions_in_queue[id].services %}
{% if not service.essential %} {% if not service.essential %}
<div class="service {{ 'running' if service.status == service.ACTIVE_STATUS }}"> <div class="service {{ 'running' if not service.is_dead() }}">
{{ service['name'] }}<br/> {{ service['name'] }}<br/>
M: <script>format_bytes({{ service['resource_reservation']['memory']['max'] }});</script><br/> M: <script>format_bytes({{ service['resource_reservation']['memory']['max'] }});</script><br/>
C: {{ service['resource_reservation']['cores']['max'] }} C: {{ service['resource_reservation']['cores']['max'] }}
......
...@@ -33,6 +33,7 @@ class Execution: ...@@ -33,6 +33,7 @@ class Execution:
""" """
SUBMIT_STATUS = "submitted" SUBMIT_STATUS = "submitted"
IMAGE_DL_STATUS = "image download"
SCHEDULED_STATUS = "scheduled" SCHEDULED_STATUS = "scheduled"
STARTING_STATUS = "starting" STARTING_STATUS = "starting"
ERROR_STATUS = "error" ERROR_STATUS = "error"
...@@ -96,6 +97,11 @@ class Execution: ...@@ -96,6 +97,11 @@ class Execution:
self._status = self.SCHEDULED_STATUS self._status = self.SCHEDULED_STATUS
self.sql_manager.execution_update(self.id, status=self._status) self.sql_manager.execution_update(self.id, status=self._status)
def set_image_dl(self):
"""The execution has been added to the scheduler queues."""
self._status = self.IMAGE_DL_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_starting(self): def set_starting(self):
"""The services of the execution are being created in Swarm.""" """The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS self._status = self.STARTING_STATUS
...@@ -132,10 +138,10 @@ class Execution: ...@@ -132,10 +138,10 @@ class Execution:
@property @property
def is_active(self): def is_active(self):
""" """
Returns True if the execution is in the scheduler Returns False if the execution ended completely
:return: :return:
""" """
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS or self._status == self.IMAGE_DL_STATUS
@property @property
def is_running(self): def is_running(self):
......
...@@ -89,6 +89,7 @@ class VolumeDescriptionHostPath(VolumeDescription): ...@@ -89,6 +89,7 @@ class VolumeDescriptionHostPath(VolumeDescription):
class Service: class Service:
"""A Zoe Service.""" """A Zoe Service."""
CREATED_STATUS = 'created'
TERMINATING_STATUS = "terminating" TERMINATING_STATUS = "terminating"
INACTIVE_STATUS = "inactive" INACTIVE_STATUS = "inactive"
ACTIVE_STATUS = "active" ACTIVE_STATUS = "active"
...@@ -186,20 +187,17 @@ class Service: ...@@ -186,20 +187,17 @@ class Service:
return self.id == other.id return self.id == other.id
def set_terminating(self): def set_terminating(self):
"""The service is being killed.""" """The service is being terminated."""
self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS) self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
self.status = self.TERMINATING_STATUS self.status = self.TERMINATING_STATUS
def set_inactive(self): def set_inactive(self):
"""The service is not running.""" """The service must not run."""
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, backend_id=None, ip_address=None, backend_host=None) self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS)
self.status = self.INACTIVE_STATUS self.status = self.INACTIVE_STATUS
for port in self.ports:
port.reset()
self.backend_host = None
def set_starting(self): def set_starting(self):
"""The service is being created by Docker.""" """The service is being started by the back-end."""
self.sql_manager.service_update(self.id, status=self.STARTING_STATUS) self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
self.status = self.STARTING_STATUS self.status = self.STARTING_STATUS
...@@ -208,13 +206,19 @@ class Service: ...@@ -208,13 +206,19 @@ class Service:
self.sql_manager.service_update(self.id, status=self.RUNNABLE_STATUS) self.sql_manager.service_update(self.id, status=self.RUNNABLE_STATUS)
self.status = self.RUNNABLE_STATUS self.status = self.RUNNABLE_STATUS
def set_active(self, backend_id, ip_address): def set_active(self, backend_id, ip_address, ports):
"""The service is running and has a valid backend_id.""" """The service is running and has a valid backend_id."""
self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, backend_id=backend_id, error_message=None, ip_address=ip_address) self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, backend_id=backend_id, error_message=None, ip_address=ip_address, backend_status=self.BACKEND_START_STATUS)
self.error_message = None self.error_message = None
self.ip_address = ip_address self.ip_address = ip_address
self.backend_id = backend_id self.backend_id = backend_id
self.status = self.ACTIVE_STATUS self.status = self.ACTIVE_STATUS
self.backend_status = self.BACKEND_START_STATUS
for port in self.ports:
if port.internal_name in ports and ports[port.internal_name] is not None:
port.activate(ip_address, ports[port.internal_name])
else:
port.reset()
def set_error(self, error_message): def set_error(self, error_message):
"""The service could not be created/started.""" """The service could not be created/started."""
...@@ -223,10 +227,17 @@ class Service: ...@@ -223,10 +227,17 @@ class Service:
self.error_message = error_message self.error_message = error_message
def set_backend_status(self, new_status): def set_backend_status(self, new_status):
"""Docker has emitted an event related to this service.""" """The backend status of the service has changed."""
self.sql_manager.service_update(self.id, backend_status=new_status)
log.debug("service {}, backend status updated to {}".format(self.id, new_status)) log.debug("service {}, backend status updated to {}".format(self.id, new_status))
self.backend_status = new_status self.backend_status = new_status
if self.is_dead():
for port in self.ports:
port.reset()
self.backend_id = None
self.ip_address = None
self.sql_manager.service_update(self.id, backend_status=new_status, ip_address=None, backend_id=None)
else:
self.sql_manager.service_update(self.id, backend_status=new_status)
def assign_backend_host(self, backend_host): def assign_backend_host(self, backend_host):
"""Assign this service to a host in particular.""" """Assign this service to a host in particular."""
...@@ -260,7 +271,7 @@ class Service: ...@@ -260,7 +271,7 @@ class Service:
def is_dead(self): def is_dead(self):
"""Returns True if this service is not running.""" """Returns True if this service is not running."""
return self.backend_status == self.BACKEND_DESTROY_STATUS or self.backend_status == self.BACKEND_OOM_STATUS or self.backend_status == self.BACKEND_DIE_STATUS or self.backend_status == self.BACKEND_UNDEFINED_STATUS return self.backend_status != self.BACKEND_START_STATUS
@property @property
def unique_name(self): def unique_name(self):
......
...@@ -159,7 +159,10 @@ class SQLManager: ...@@ -159,7 +159,10 @@ class SQLManager:
filter_list = [] filter_list = []
args_list = [] args_list = []
for key, value in kwargs.items(): for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key)) if key.startswith('not_'):
filter_list.append('{} != %s'.format(key[4:]))
else:
filter_list.append('{} = %s'.format(key))
args_list.append(value) args_list.append(value)
q += ' AND '.join(filter_list) q += ' AND '.join(filter_list)
query = cur.mogrify(q, args_list) query = cur.mogrify(q, args_list)
...@@ -193,7 +196,7 @@ class SQLManager: ...@@ -193,7 +196,7 @@ class SQLManager:
def service_new(self, execution_id, name, service_group, description, is_essential): def service_new(self, execution_id, name, service_group, description, is_essential):
"""Adds a new service to the state.""" """Adds a new service to the state."""
cur = self._cursor() cur = self._cursor()
status = 'created' status = Service.CREATED_STATUS
query = cur.mogrify('INSERT INTO service (id, status, execution_id, name, service_group, description, essential) VALUES (DEFAULT,%s,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential)) query = cur.mogrify('INSERT INTO service (id, status, execution_id, name, service_group, description, essential) VALUES (DEFAULT,%s,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential))
cur.execute(query) cur.execute(query)
self.conn.commit() self.conn.commit()
......
...@@ -42,7 +42,7 @@ class BaseBackend: ...@@ -42,7 +42,7 @@ class BaseBackend:
* raise ``ZoeStartExecutionRetryException`` in case a temporary error is generated * raise ``ZoeStartExecutionRetryException`` in case a temporary error is generated
* raise ``ZoeStartExecutionFatalException`` in case a fatal error is generated * raise ``ZoeStartExecutionFatalException`` in case a fatal error is generated
* return a backend-specific ID that will be used later by Zoe to interact with the running container * return a tuple with three elements: backend-specific ID that will be used later by Zoe to interact with the running container, the externally-reachable ip address for the container and the port mapping
""" """
raise NotImplementedError raise NotImplementedError
......
...@@ -45,6 +45,7 @@ class DockerClient: ...@@ -45,6 +45,7 @@ class DockerClient:
"""The client class that wraps the Docker API.""" """The client class that wraps the Docker API."""
def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None: def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None:
self.name = docker_config.name self.name = docker_config.name
self.docker_config = docker_config
if not docker_config.tls: if not docker_config.tls:
tls = None tls = None
else: else:
...@@ -146,20 +147,14 @@ class DockerClient: ...@@ -146,20 +147,14 @@ class DockerClient:
"id": container.id, "id": container.id,
"ip_address": {}, "ip_address": {},
"name": container.name, "name": container.name,
'labels': container.attrs['Config']['Labels'] 'labels': container.attrs['Config']['Labels'],
'external_address': self.docker_config.external_address
} # type: Dict[str, Any] } # type: Dict[str, Any]
try: try:
info['host'] = container.attrs['Node']['Name'], info['host'] = container.attrs['Node']['Name'],
except KeyError: except KeyError:
info['host'] = 'N/A' info['host'] = 'N/A'
if container.attrs["NetworkSettings"]["Networks"] is not None:
for net in container.attrs["NetworkSettings"]["Networks"]:
if len(container.attrs["NetworkSettings"]["Networks"][net]['IPAddress']) > 0:
info["ip_address"][net] = container.attrs["NetworkSettings"]["Networks"][net]['IPAddress']
else:
info["ip_address"][net] = None
if container.status == 'running' or container.status == 'restarting': if container.status == 'running' or container.status == 'restarting':
info["state"] = Service.BACKEND_START_STATUS info["state"] = Service.BACKEND_START_STATUS
info["running"] = True info["running"] = True
......
...@@ -71,13 +71,14 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend): ...@@ -71,13 +71,14 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
except ZoeException as e: except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e)) raise ZoeStartExecutionFatalException(str(e))
return cont_info["id"], cont_info['ip_address'][get_conf().overlay_network_name] return cont_info["id"], cont_info['external_address'], cont_info['ports']
def terminate_service(self, service: Service) -> None: def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container.""" """Terminate and delete a container."""
conf = self._get_config(service.backend_host) conf = self._get_config(service.backend_host)
engine = DockerClient(conf) engine = DockerClient(conf)
engine.terminate_container(service.backend_id, delete=True) engine.terminate_container(service.backend_id, delete=True)
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def platform_state(self) -> ClusterStats: def platform_state(self) -> ClusterStats:
"""Get the platform state.""" """Get the platform state."""
......
...@@ -77,7 +77,7 @@ class DockerStateSynchronizer(threading.Thread): ...@@ -77,7 +77,7 @@ class DockerStateSynchronizer(threading.Thread):
node_stats.status = 'online' node_stats.status = 'online'
node_stats.labels = host_config.labels node_stats.labels = host_config.labels
service_list = self.state.service_list(backend_host=host_config.name) service_list = self.state.service_list(backend_host=host_config.name, not_status=Service.INACTIVE_STATUS)
try: try:
container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name}) container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
except ZoeException: except ZoeException:
...@@ -95,7 +95,7 @@ class DockerStateSynchronizer(threading.Thread): ...@@ -95,7 +95,7 @@ class DockerStateSynchronizer(threading.Thread):
if service.backend_id in containers: if service.backend_id in containers:
self._update_service_status(service, containers[service.backend_id], host_config) self._update_service_status(service, containers[service.backend_id], host_config)
else: else:
if service.backend_status == service.BACKEND_DESTROY_STATUS: if service.status == service.CREATED_STATUS or service.backend_status == service.BACKEND_DESTROY_STATUS:
continue continue
else: else:
service.set_backend_status(service.BACKEND_DESTROY_STATUS) service.set_backend_status(service.BACKEND_DESTROY_STATUS)
...@@ -104,8 +104,6 @@ class DockerStateSynchronizer(threading.Thread): ...@@ -104,8 +104,6 @@ class DockerStateSynchronizer(threading.Thread):
self._update_node_stats(my_engine, node_stats) self._update_node_stats(my_engine, node_stats)
except ZoeException as e: except ZoeException as e:
log.error(str(e)) log.error(str(e))
node_stats.status = 'offline'
log.warning('Node {} is offline'.format(host_config.name))
time.sleep(CHECK_INTERVAL) time.sleep(CHECK_INTERVAL)
......
...@@ -100,7 +100,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service] ...@@ -100,7 +100,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
service.set_starting() service.set_starting()
instance = ServiceInstance(execution, service, env_subst_dict) instance = ServiceInstance(execution, service, env_subst_dict)
try: try:
backend_id, ip_address = backend.spawn_service(instance) backend_id, ip_address, ports = backend.spawn_service(instance)
except ZoeStartExecutionRetryException as ex: except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message)) log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message) service.set_error(ex.message)
...@@ -124,7 +124,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service] ...@@ -124,7 +124,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
return "fatal" return "fatal"
else: else:
log.debug('Service {} started'.format(instance.name)) log.debug('Service {} started'.format(instance.name))
service.set_active(backend_id, ip_address) service.set_active(backend_id, ip_address, ports)
return "ok" return "ok"
...@@ -157,13 +157,20 @@ def terminate_execution(execution: Execution) -> None: ...@@ -157,13 +157,20 @@ def terminate_execution(execution: Execution) -> None:
"""Terminate an execution.""" """Terminate an execution."""
execution.set_cleaning_up() execution.set_cleaning_up()
backend = _get_backend() backend = _get_backend()
for service in execution.services: for service in execution.services: # type: Service
assert isinstance(service, Service) if service.status != Service.INACTIVE_STATUS:
if service.backend_id is not None: if service.status == Service.ERROR_STATUS:
service.set_terminating() continue
backend.terminate_service(service) elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
service.set_inactive() service.set_terminating()
log.debug('Service {} terminated'.format(service.name)) backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
execution.set_terminated() execution.set_terminated()
......
...@@ -61,7 +61,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend): ...@@ -61,7 +61,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
except ZoeException as e: except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e)) raise ZoeStartExecutionFatalException(str(e))
return rc_info["backend_id"], rc_info['ip_address'] return rc_info["backend_id"], rc_info['ip_address'], None
def terminate_service(self, service: Service) -> None: def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container.""" """Terminate and delete a container."""
......
...@@ -58,7 +58,7 @@ class SwarmBackend(zoe_master.backends.base.BaseBackend): ...@@ -58,7 +58,7 @@ class SwarmBackend(zoe_master.backends.base.BaseBackend):
except ZoeException as e: except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e)) raise ZoeStartExecutionFatalException(str(e))
return cont_info["id"], cont_info['ip_address'][get_conf().overlay_network_name] return cont_info["id"], cont_info['ip_address'][get_conf().overlay_network_name], cont_info['ports']
def terminate_service(self, service: Service) -> None: def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container.""" """Terminate and delete a container."""
......
...@@ -71,6 +71,7 @@ def _digest_application_description(state: SQLManager, execution: Execution): ...@@ -71,6 +71,7 @@ def _digest_application_description(state: SQLManager, execution: Execution):
def _do_execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution): def _do_execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution):
execution.set_image_dl()
if _digest_application_description(state, execution): if _digest_application_description(state, execution):
execution.set_scheduled() execution.set_scheduled()
scheduler.incoming(execution) scheduler.incoming(execution)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment