Commit fcb279b2 authored by Daniele Venzano's avatar Daniele Venzano

First pass at abstracting the backends

parent d92e83ee
......@@ -27,8 +27,8 @@ import zoe_lib.config as config
from zoe_lib.configargparse import ArgumentParser, FileType
from zoe_lib.state.sql_manager import Execution, Service
from zoe_lib.swarm_client import SwarmClient
from zoe_master.backends.old_swarm import execution_to_containers, terminate_execution
from zoe_master.execution_manager import _digest_application_description
from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(name)s (%(threadName)s): %(message)s'
......@@ -122,7 +122,7 @@ class FakeSQLManager:
'docker_id': None,
'service_group': service_group,
'error_message': None,
'docker_status': Service.DOCKER_UNDEFINED_STATUS
'docker_status': Service.BACKEND_UNDEFINED_STATUS
}
service = Service(s_dict, self)
self.services.append(service)
......
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 1 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 3 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......
......@@ -67,6 +67,12 @@ def app_validate(data):
if 'services' not in data:
raise InvalidApplicationDescription(msg='the application should contain a list of services')
if 'disable_autorestart' in data:
try:
bool(data['disable_autorestart'])
except ValueError:
raise InvalidApplicationDescription(msg="disable_autorestart field should be a boolean")
for service in data['services']:
_service_check(service)
......@@ -142,20 +148,10 @@ def _service_check(data):
if not isinstance(volume[2], bool):
raise InvalidApplicationDescription(msg='readonly volume item (third) must be a boolean: {}'.format(volume[2]))
if 'networks' in data:
if not hasattr(data['networks'], '__iter__'):
raise InvalidApplicationDescription(msg='networks should be an iterable')
if 'constraints' in data:
if not hasattr(data['constraints'], '__iter__'):
raise InvalidApplicationDescription(msg='networks should be an iterable')
if 'disable_autorestart' in data:
try:
bool(data['disable_autorestart'])
except ValueError:
raise InvalidApplicationDescription(msg="disable_autorestart field should be a boolean")
def _port_check(data):
required_keys = ['name', 'protocol', 'port_number', 'is_main_endpoint']
......
......@@ -23,6 +23,30 @@ from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
class ResourceReservation:
"""The resources reserved by a Service."""
def __init__(self, data):
self.memory = data['memory']
self.cores = data['cores']
class VolumeDescription:
"""A generic description for container volumes."""
def __init__(self, data):
self.type = "host_directory"
self.path = data[0]
self.mount_point = data[1]
self.readonly = data[2]
class ExposedPort:
"""A port on the container that should be exposed."""
def __init__(self, data):
self.proto = 'tcp' # FIXME UDP ports?
self.number = data['port_number']
self.expose = data['expose'] if 'expose' in data else False
class Service:
"""A Zoe Service."""
......@@ -32,12 +56,12 @@ class Service:
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
DOCKER_UNDEFINED_STATUS = 'undefined'
DOCKER_CREATE_STATUS = 'created'
DOCKER_START_STATUS = 'started'
DOCKER_DIE_STATUS = 'dead'
DOCKER_DESTROY_STATUS = 'destroyed'
DOCKER_OOM_STATUS = 'oom-killed'
BACKEND_UNDEFINED_STATUS = 'undefined'
BACKEND_CREATE_STATUS = 'created'
BACKEND_START_STATUS = 'started'
BACKEND_DIE_STATUS = 'dead'
BACKEND_DESTROY_STATUS = 'destroyed'
BACKEND_OOM_STATUS = 'oom-killed'
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
......@@ -49,8 +73,24 @@ class Service:
self.execution_id = d['execution_id']
self.description = d['description']
self.service_group = d['service_group']
self.docker_id = d['docker_id']
self.docker_status = d['docker_status']
self.backend_id = d['backend_id']
self.backend_status = d['backend_status']
# Fields parsed from the JSON description
self.image_name = self.description['docker_image']
self.is_monitor = self.description['monitor']
self.startup_order = self.description['startup_order']
self.environment = []
if 'environment' in self.description:
self.environment = self.description['environment']
self.command = ''
if 'command' in self.description:
self.command = self.description['command']
self.resource_reservation = ResourceReservation(self.description['required_resources'])
self.volumes = []
if 'volumes' in self.description:
self.volumes = [VolumeDescription(v) for v in self.description['volumes']]
self.ports = [ExposedPort(p) for p in self.description['ports']]
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
......@@ -62,9 +102,9 @@ class Service:
'execution_id': self.execution_id,
'description': self.description,
'service_group': self.service_group,
'docker_id': self.docker_id,
'backend_id': self.backend_id,
'ip_address': self.ip_address,
'docker_status': self.docker_status
'backend_status': self.backend_status
}
def __eq__(self, other):
......@@ -102,19 +142,19 @@ class Service:
self.status = self.ERROR_STATUS
self.error_message = error_message
def set_docker_status(self, new_status):
def set_backend_status(self, new_status):
"""Docker has emitted an event related to this service."""
self.sql_manager.service_update(self.id, docker_status=new_status)
log.debug("service {}, status updated to {}".format(self.id, new_status))
self.sql_manager.service_update(self.id, backend_status=new_status)
log.debug("service {}, backend status updated to {}".format(self.id, new_status))
self.docker_status = new_status
@property
def ip_address(self):
"""Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
if self.docker_status != self.DOCKER_START_STATUS:
if self.docker_status != self.BACKEND_START_STATUS:
return {}
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(self.docker_id)
s_info = swarm.inspect_container(self.backend_id)
return s_info['ip_address'][get_conf().overlay_network_name]
@property
......
......@@ -35,7 +35,7 @@ def execution_to_containers(execution: Execution) -> None:
If an error occurs some containers may have been created and needs to be cleaned-up.
In case of error exceptions are raised.
"""
ordered_service_list = sorted(execution.services, key=lambda x: x.description['startup_order'])
ordered_service_list = sorted(execution.services, key=lambda x: x.startup_order)
env_subst_dict = {
'execution_id': execution.id,
......@@ -57,22 +57,21 @@ def _gen_environment(service, env_subst_dict, copts):
""" Generate a dictionary containing the current cluster status (before the new container is spawned)
This information is used to substitute template strings in the environment variables."""
if 'environment' in service.description:
for env_name, env_value in service.description['environment']:
try:
env_value = env_value.format(**env_subst_dict)
except KeyError:
error_msg = "Unknown variable in environment expression '{}', known variables are: {}".format(env_value, list(env_subst_dict.keys()))
service.set_error(error_msg)
raise ZoeStartExecutionFatalException("Service {} has wrong environment expression")
copts.add_env_variable(env_name, env_value)
for env_name, env_value in service.environment:
try:
env_value = env_value.format(**env_subst_dict)
except KeyError:
error_msg = "Unknown variable in environment expression '{}', known variables are: {}".format(env_value, list(env_subst_dict.keys()))
service.set_error(error_msg)
raise ZoeStartExecutionFatalException("Service {} has wrong environment expression")
copts.add_env_variable(env_name, env_value)
def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict):
copts = DockerContainerOptions()
copts.gelf_log_address = get_conf().gelf_address
copts.name = service.dns_name
copts.set_memory_limit(service.description['required_resources']['memory'])
copts.set_memory_limit(service.resource_reservation.memory)
copts.network_name = get_conf().overlay_network_name
copts.labels = {
'zoe.execution.name': execution.name,
......@@ -83,7 +82,7 @@ def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict)
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service.description['monitor']:
if service.is_monitor:
copts.labels['zoe.monitor'] = 'true'
else:
copts.labels['zoe.monitor'] = 'false'
......@@ -92,21 +91,23 @@ def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict)
log.debug("Autorestart disabled for service {}".format(service.id))
copts.restart = False
else:
copts.restart = not service.description['monitor'] # Monitor containers should not restart
copts.restart = not service.is_monitor # Monitor containers should not restart
_gen_environment(service, env_subst_dict, copts)
for port in service.description['ports']:
if 'expose' in port and port['expose']:
copts.ports.append(port['port_number']) # FIXME UDP ports?
for port in service.ports:
if port.expose:
copts.ports.append(port.port_number) # FIXME UDP ports?
if 'volumes' in service.description:
for path, mount_point, readonly in service.description['volumes']:
copts.add_volume_bind(path, mount_point, readonly)
for volume in service.volumes:
if volume.type == "host_directory":
copts.add_volume_bind(volume.path, volume.mount_point, volume.readonly)
else:
log.warning('Docker Swarm backend does not support volume type {}'.format(volume.type))
if 'constraints' in service.description:
for constraint in service.description['constraints']:
copts.add_constraint(constraint)
# if 'constraints' in service.description:
# for constraint in service.description['constraints']:
# copts.add_constraint(constraint)
fswk = ZoeFSWorkspace()
if fswk.can_be_attached():
......@@ -114,8 +115,7 @@ def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict)
copts.add_env_variable('ZOE_WORKSPACE', fswk.get_mountpoint())
# The same dictionary is used for templates in the command
if 'command' in service.description:
copts.set_command(service.description['command'].format(**env_subst_dict))
copts.set_command(service.command.format(**env_subst_dict))
try:
swarm = SwarmClient(get_conf())
......@@ -123,7 +123,7 @@ def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict)
raise ZoeStartExecutionFatalException(str(e))
try:
cont_info = swarm.spawn_container(service.description['docker_image'], copts)
cont_info = swarm.spawn_container(service.image_name, copts)
except ZoeNotEnoughResourcesException:
service.set_error('Not enough free resources to satisfy reservation request')
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service.name))
......@@ -132,13 +132,14 @@ def _spawn_service(execution: Execution, service: Service, env_subst_dict: dict)
service.set_active(cont_info["docker_id"])
if 'networks' in service.description:
for net in service.description['networks']:
try:
swarm.connect_to_network(service.docker_id, net)
except ZoeException as e:
service.set_error(str(e))
raise ZoeStartExecutionFatalException("Failed to attach network {} to service {}".format(net, service.name))
# Networks are a Docker-specific feature
net = get_conf().overlay_network_name
try:
swarm.connect_to_network(service.backend_id, net)
except ZoeException as e:
swarm.terminate_container(service.backend_id, True)
service.set_error(str(e))
raise ZoeStartExecutionFatalException("Failed to attach network {} to service {}".format(net, service.name))
return
......@@ -150,9 +151,9 @@ def terminate_execution(execution: Execution) -> None:
swarm = SwarmClient(get_conf())
for service in execution.services:
assert isinstance(service, Service)
if service.docker_id is not None:
if service.backend_id is not None:
service.set_terminating()
swarm.terminate_container(service.docker_id, delete=True)
swarm.terminate_container(service.backend_id, delete=True)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
execution.set_terminated()
......@@ -50,17 +50,17 @@ class ZoeSwarmChecker(threading.Thread):
for service in service_list:
assert isinstance(service, Service)
if service.docker_status == service.DOCKER_DESTROY_STATUS or service.docker_status == service.DOCKER_DIE_STATUS:
if service.docker_status == service.BACKEND_DESTROY_STATUS or service.docker_status == service.BACKEND_DIE_STATUS:
continue
found = False
for container in container_list:
if container['id'] == service.docker_id:
if container['id'] == service.backend_id:
found = True
if container['status'] == 'exited':
log.info('resetting status of service {}, died with no event'.format(service.name))
service.set_docker_status(service.DOCKER_DIE_STATUS)
service.set_backend_status(service.BACKEND_DIE_STATUS)
if not found:
service.set_docker_status(service.DOCKER_DESTROY_STATUS)
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
time.sleep(CHECK_INTERVAL)
......
......@@ -76,16 +76,16 @@ class ZoeMonitor(threading.Thread):
if 'exec' in event['Action']:
pass
elif 'create' in event['Action']:
service.set_docker_status(service.DOCKER_CREATE_STATUS)
service.set_backend_status(service.DOCKER_CREATE_STATUS)
elif 'start' in event['Action']:
service.set_docker_status(service.DOCKER_START_STATUS)
service.set_backend_status(service.DOCKER_START_STATUS)
elif 'die' in event['Action'] or 'kill' in event['Action'] or 'stop' in event['Action']:
service.set_docker_status(service.DOCKER_DIE_STATUS)
service.set_backend_status(service.DOCKER_DIE_STATUS)
elif 'oom' in event['Action']:
service.set_docker_status(service.DOCKER_OOM_STATUS)
service.set_backend_status(service.DOCKER_OOM_STATUS)
log.warning('Service {} got killed by an OOM condition'.format(service.id))
elif 'destroy' in event['Action']:
service.set_docker_status(service.DOCKER_DESTROY_STATUS)
service.set_backend_status(service.DOCKER_DESTROY_STATUS)
else:
log.debug('Unmanaged container action: {}'.format(event['Action']))
......
......@@ -19,9 +19,9 @@ import logging
import threading
from zoe_lib.state import Execution
from zoe_master.backends.old_swarm import execution_to_containers, terminate_execution
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution
log = logging.getLogger(__name__)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment