Commit 52a0af7d authored by Daniele Venzano's avatar Daniele Venzano

Use a generic container description to communicate with backends

parent 90ceaee6
......@@ -48,7 +48,7 @@ def gen_environment(service: Service, execution: Execution):
def _create_logs_directories(exec_id, service_name):
path = get_conf().logs_base_path + '/' + get_conf().deployment_name + '/' + str(exec_id) + '/' + service_name
path = os.path.join(get_conf().logs_base_path, get_conf().deployment_name, str(exec_id), service_name)
try:
os.makedirs(path)
except OSError as e:
......@@ -72,3 +72,16 @@ def gen_volumes(service: Service, execution: Execution):
vol_list.append(logs_vol)
return vol_list
def gen_labels(service: Service, execution: Execution):
"""Generate container labels, useful for identifying containers in monitoring systems."""
return {
'zoe_execution_name': execution.name,
'zoe_execution_id': str(execution.id),
'zoe_service_name': service.name,
'zoe_service_id': str(service.id),
'zoe_owner': execution.user_id,
'zoe_deployment_name': get_conf().deployment_name,
'zoe_type': 'app_service'
}
......@@ -22,6 +22,7 @@ from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
import zoe_master.backends.old_swarm.api_client
from zoe_master.backends.old_swarm_new_api.backend import OldSwarmNewAPIBackend
......@@ -67,10 +68,12 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
for service in ordered_service_list:
service.set_starting()
instance = ServiceInstance(execution, service)
try:
backend.spawn_service(execution, service)
backend_id = backend.spawn_service(instance)
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message)
execution.set_error_message(ex.message)
terminate_execution(execution)
execution.set_scheduled()
......@@ -89,7 +92,8 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
execution.set_error()
return "fatal"
else:
execution.set_running()
service.set_active(backend_id)
return "ok"
......
......@@ -67,6 +67,7 @@ class DockerContainerOptions:
self.labels = []
self.gelf_log_address = ''
self.constraints = []
self.entrypoint = ''
def add_constraint(self, constraint):
"""Add a placement constraint (use docker syntax)."""
......@@ -102,6 +103,10 @@ class DockerContainerOptions:
"""Getter for the command to run in the container."""
return self.command
def set_entrypoint(self, entrypoint):
"""Setter for the entrypoint."""
self.entrypoint = entrypoint
def set_memory_limit(self, limit: int):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
......@@ -252,6 +257,7 @@ class SwarmClient:
volumes=options.get_volumes(),
command=options.get_command(),
ports=options.ports,
entrypoint=options.entrypoint,
labels=options.labels)
self.cli.start(container=cont.get('Id'))
except docker.errors.APIError as e:
......
......@@ -19,11 +19,11 @@ import logging
from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_lib.state import Execution, Service
from zoe_lib.state import Service
from zoe_master.backends.old_swarm.api_client import DockerContainerOptions, SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException
import zoe_master.backends.common
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.old_swarm.threads import SwarmMonitor, SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
......@@ -53,66 +53,42 @@ class OldSwarmBackend(zoe_master.backends.base.BaseBackend):
_monitor.quit()
_checker.quit()
def spawn_service(self, execution: Execution, service: Service):
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
copts = DockerContainerOptions()
copts.gelf_log_address = get_conf().gelf_address
copts.name = service.dns_name
copts.set_memory_limit(service.resource_reservation.memory)
copts.name = service_instance.hostname
copts.set_memory_limit(service_instance.memory_limit)
copts.network_name = get_conf().overlay_network_name
copts.labels = {
'zoe.execution.name': execution.name,
'zoe.execution.id': str(execution.id),
'zoe.service.name': service.name,
'zoe.service.id': str(service.id),
'zoe.owner': execution.user_id,
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service.is_monitor:
copts.labels['zoe.monitor'] = 'true'
else:
copts.labels['zoe.monitor'] = 'false'
# Always disable autorestart
# if 'disable_autorestart' in execution.description and execution.description['disable_autorestart']:
# log.debug("Autorestart disabled for service {}".format(service.id))
# copts.restart = False
# else:
# copts.restart = not service.is_monitor # Monitor containers should not restart
copts.labels = service_instance.labels
# Always disable auto restart
copts.restart = False
env_vars = zoe_master.backends.common.gen_environment(service, execution)
for name, value in env_vars:
for name, value in service_instance.environment:
copts.add_env_variable(name, value)
for port in service.ports:
for port in service_instance.ports:
if port.expose:
copts.ports.append(port.port_number)
zoe_volumes = zoe_master.backends.common.gen_volumes(service, execution)
for volume in service.volumes + zoe_volumes:
for volume in service_instance.volumes:
if volume.type == "host_directory":
copts.add_volume_bind(volume.path, volume.mount_point, volume.readonly)
else:
log.warning('Docker Swarm backend does not support volume type {}'.format(volume.type))
# if 'constraints' in service.description:
# for constraint in service.description['constraints']:
# copts.add_constraint(constraint)
# The same dictionary is used for templates in the command
copts.set_command(service.command.format(**env_subst_dict))
copts.set_entrypoint(service_instance.entrypoint)
copts.set_command(service_instance.command)
try:
cont_info = self.swarm.spawn_container(service.image_name, copts)
cont_info = self.swarm.spawn_container(service_instance.image_name, copts)
except ZoeNotEnoughResourcesException:
service.set_error('Not enough free resources to satisfy reservation request')
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service.name))
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except (ZoeException, ZoeLibException) as e:
raise ZoeStartExecutionFatalException(str(e))
service.set_active(cont_info["docker_id"], cont_info['ip_address'][get_conf().overlay_network_name])
return cont_info["docker_id"]
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
......
......@@ -15,10 +15,9 @@
"""Interface to the low-level Docker API."""
from argparse import Namespace
import time
import logging
from typing import Iterable, Callable, Dict, Any, Union
from typing import Iterable, Callable, Dict, Any
import humanfriendly
......@@ -47,73 +46,14 @@ else:
import requests.packages
from zoe_master.stats import ClusterStats, NodeStats
from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
log = logging.getLogger(__name__)
class DockerContainerOptions:
"""Wrapper for the Docker container options."""
def __init__(self):
self.env = {}
self.volumes = {}
self.command = ""
self.memory_limit = 2 * (1024**3)
self.name = ''
self.ports = []
self.network_name = 'bridge'
self.restart = True
self.labels = []
self.gelf_log_address = ''
self.constraints = []
def add_constraint(self, constraint):
"""Add a placement constraint (use docker syntax)."""
self.constraints.append(constraint)
def add_env_variable(self, name: str, value: Union[str, None]) -> None:
"""Add an environment variable to the container definition."""
self.env[name] = value
@property
def environment(self) -> Dict[str, Union[str, None]]:
"""Access the environment variables."""
return self.env
def add_volume_bind(self, path: str, mountpoint: str, readonly=False) -> None:
"""Add a volume to the container."""
self.volumes[path] = {'bind': mountpoint, 'mode': ("ro" if readonly else "rw")}
def get_volumes(self) -> Iterable[str]:
"""Get the volumes in Docker format."""
return self.volumes
def set_command(self, cmd):
"""Setter for the command to run in the container."""
self.command = cmd
def get_command(self) -> str:
"""Getter for the command to run in the container."""
return self.command
def set_memory_limit(self, limit: int):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
def get_memory_limit(self) -> int:
"""Getter for the memory limit of the container."""
return self.memory_limit
@property
def restart_policy(self) -> Dict[str, str]:
"""Getter for the restart policy of the container."""
if self.restart:
return {'Name': 'always'}
else:
return {}
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
"""
Given a Zookeeper server list, find the currently active Swarm master.
......@@ -144,13 +84,16 @@ def consul_swarm(consul_ip: str) -> str:
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self, opts: Namespace) -> None:
self.opts = opts
url = opts.backend_swarm_url
def __init__(self) -> None:
url = get_conf().backend_swarm_url
if 'zk://' in url:
if KazooClient is None:
raise ZoeLibException('ZooKeeper URL for Swarm, but the kazoo package is not installed')
url = url[len('zk://'):]
manager = zookeeper_swarm(url, opts.backend_swarm_zk_path)
manager = zookeeper_swarm(url, get_conf().backend_swarm_zk_path)
elif 'consul://' in url:
if Consul is None:
raise ZoeLibException('Consul URL for Swarm, but the consul package is not installed')
url = url[len('consul://'):]
manager = consul_swarm(url)
elif 'http://' or 'https://' in url:
......@@ -210,22 +153,20 @@ class SwarmClient:
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, image: str, options: DockerContainerOptions) -> Dict[str, Any]:
def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
"""Create and start a new container."""
cont = None
port_bindings = {} # type: Dict[str, Any]
for port in options.ports:
port_bindings[str(port) + '/tcp'] = None
for constraint in options.constraints:
options.add_env_variable(constraint, None)
for port in service_instance.ports:
if port.expose:
port_bindings[str(port.number) + '/tcp'] = None
if options.gelf_log_address != '':
if get_conf().gelf_address != '':
log_config = {
"type": "gelf",
"config": {
'gelf-address': options.gelf_log_address,
'labels': ",".join(options.labels)
'gelf-address': get_conf().gelf_address,
'labels': ",".join(service_instance.labels)
}
}
else:
......@@ -234,23 +175,30 @@ class SwarmClient:
"config": {}
}
environment = {}
for name, value in service_instance.environment:
environment[name] = value
volumes = {}
for volume in service_instance.volumes:
volumes[volume.path] = {'bind': volume.mountpoint, 'mode': ("ro" if volume.readonly else "rw")}
try:
cont = self.cli.containers.run(image=image,
command=options.get_command(),
cont = self.cli.containers.run(image=service_instance.image_name,
command=service_instance.command,
detach=True,
environment=options.environment,
hostname=options.name,
labels=options.labels,
environment=environment,
hostname=service_instance.hostname,
labels=service_instance.labels,
log_config=log_config,
mem_limit=options.get_memory_limit(),
memswap_limit=options.get_memory_limit(),
name=options.name,
networks=[options.network_name],
mem_limit=service_instance.memory_limit,
memswap_limit=service_instance.memory_limit,
name=service_instance.name,
networks=[get_conf().overlay_network_name],
network_disabled=False,
network_mode=options.network_name,
network_mode=get_conf().overlay_network_name,
ports=port_bindings,
restart_policy=options.restart_policy,
volumes=options.get_volumes())
volumes=volumes)
except docker.errors.ImageNotFound:
raise ZoeLibException(message='Image not found')
except docker.errors.APIError as e:
......
......@@ -17,13 +17,12 @@
import logging
from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_lib.state import Execution, Service
from zoe_master.backends.old_swarm_new_api.api_client import DockerContainerOptions, SwarmClient
from zoe_lib.state import Service
from zoe_master.backends.old_swarm_new_api.api_client import SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException
import zoe_master.backends.common
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.old_swarm_new_api.threads import SwarmMonitor, SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
......@@ -53,66 +52,16 @@ class OldSwarmNewAPIBackend(zoe_master.backends.base.BaseBackend):
_monitor.quit()
_checker.quit()
def spawn_service(self, execution: Execution, service: Service):
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
copts = DockerContainerOptions()
copts.gelf_log_address = get_conf().gelf_address
copts.name = service.dns_name
copts.set_memory_limit(service.resource_reservation.memory)
copts.network_name = get_conf().overlay_network_name
copts.labels = {
'zoe.execution.name': execution.name,
'zoe.execution.id': str(execution.id),
'zoe.service.name': service.name,
'zoe.service.id': str(service.id),
'zoe.owner': execution.user_id,
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service.is_monitor:
copts.labels['zoe.monitor'] = 'true'
else:
copts.labels['zoe.monitor'] = 'false'
# Always disable autorestart
# if 'disable_autorestart' in execution.description and execution.description['disable_autorestart']:
# log.debug("Autorestart disabled for service {}".format(service.id))
# copts.restart = False
# else:
# copts.restart = not service.is_monitor # Monitor containers should not restart
copts.restart = False
env_vars = zoe_master.backends.common.gen_environment(service, execution)
for name, value in env_vars:
copts.add_env_variable(name, value)
for port in service.ports:
if port.expose:
copts.ports.append(port.number)
zoe_volumes = zoe_master.backends.common.gen_volumes(service, execution)
for volume in service.volumes + zoe_volumes:
if volume.type == "host_directory":
copts.add_volume_bind(volume.path, volume.mount_point, volume.readonly)
else:
log.warning('Docker Swarm backend does not support volume type {}'.format(volume.type))
# if 'constraints' in service.description:
# for constraint in service.description['constraints']:
# copts.add_constraint(constraint)
# The same dictionary is used for templates in the command
copts.set_command(service.command.format(**env_subst_dict))
try:
cont_info = self.swarm.spawn_container(service.image_name, copts)
cont_info = self.swarm.spawn_container(service_instance)
except ZoeNotEnoughResourcesException:
service.set_error('Not enough free resources to satisfy reservation request')
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service.name))
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except (ZoeException, ZoeLibException) as e:
raise ZoeStartExecutionFatalException(str(e))
service.set_active(cont_info["id"], cont_info['ip_address'][get_conf().overlay_network_name])
return cont_info["id"]
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
......
......@@ -207,6 +207,8 @@ class ZoeElasticScheduler:
elif ret == "requeue":
self.queue.insert(0, job)
continue
elif ret == "ok":
job.set_running()
assert ret == "ok"
start_elastic(job)
......
......@@ -108,6 +108,8 @@ class ZoeSimpleScheduler(ZoeBaseScheduler):
ret = start_all(e)
if ret == 'requeue':
self.fifo_queue.append(e)
else:
e.set_running()
def quit(self):
"""Stop the scheduler thread."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment