Commit 18b4893a authored by Zoe Jenkins's avatar Zoe Jenkins
Browse files

New Swarm backend based on the latest Docker Python API

Adds as a benefit:
- support for TLS
- no more monitor thread, as the event stream was unreliable
parent a47a16f6
......@@ -151,18 +151,12 @@ def _service_check(data):
if not isinstance(volume[2], bool):
raise InvalidApplicationDescription(msg='readonly volume item (third) must be a boolean: {}'.format(volume[2]))
if 'constraints' in data and not hasattr(data['constraints'], '__iter__'):
raise InvalidApplicationDescription(msg='networks should be an iterable')
if get_conf().backend == 'Kubernetes':
if 'replicas' not in data:
data['replicas'] = 1
try:
int(data['replicas'])
except ValueError:
raise InvalidApplicationDescription(msg="replicas field should be an int")
if 'replicas' not in data:
data['replicas'] = 1
try:
int(data['replicas'])
except ValueError:
raise InvalidApplicationDescription(msg="replicas field should be an int")
def _port_check(data):
......
......@@ -49,7 +49,6 @@ def load_configuration(test_conf=None):
# Common options
argparser.add_argument('--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--swarm', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--deployment-name', help='name of this Zoe deployment', default='prod')
argparser.add_argument('--dbname', help='DB name', default='zoe')
......@@ -94,7 +93,16 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['OldSwarm', 'Kubernetes'], default='OldSwarm')
argparser.add_argument('--backend', choices=['Swarm', 'OldSwarm', 'Kubernetes'], default='Swarm')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--backend-swarm-zk-path', help='Swarm/Docker optional ZooKeeper path for Swarm Znodes', default='/docker')
argparser.add_argument('--backend-swarm-tls-cert', help='Docker TLS certificate file', default='cert.pem')
argparser.add_argument('--backend-swarm-tls-key', help='Docker TLS private key file', default='key.pem')
argparser.add_argument('--backend-swarm-tls-ca', help='Docker TLS CA certificate file', default='ca.pem')
# Kubernetes backend
argparser.add_argument('--kube-config-file', help='Kubernetes configuration file', default='/opt/zoe/kube.conf')
argparser.add_argument('--cookie-secret', help='secret used to encrypt cookies', default='changeme')
......
......@@ -46,8 +46,3 @@ class InvalidApplicationDescription(ZoeAPIException):
"""
def __init__(self, msg):
super().__init__("Error: " + msg)
class ZoeNotEnoughResourcesException(ZoeLibException):
"""Service failed to start due to not enough available resources."""
pass
......@@ -18,4 +18,4 @@
from zoe_lib.state.base import Base
from zoe_lib.state.execution import Execution
from zoe_lib.state.sql_manager import SQLManager
from zoe_lib.state.service import Service
from zoe_lib.state.service import Service, VolumeDescription
......@@ -196,3 +196,8 @@ class Service:
def is_dead(self):
"""Returns True if this service is not running."""
return self.backend_status == self.BACKEND_DESTROY_STATUS or self.backend_status == self.BACKEND_OOM_STATUS or self.backend_status == self.BACKEND_DIE_STATUS
@property
def unique_name(self):
"""Returns a name for this service that is unique across multiple Zoe instances running on the same backend."""
return self.name + '-' + str(self.execution_id) + '-' + get_conf().deployment_name
......@@ -15,10 +15,9 @@
"""The base class that all backends should implement."""
from typing import Dict
from zoe_lib.state import Execution, Service
from zoe_lib.state import Service
from zoe_master.stats import ClusterStats
from zoe_master.backends.service_instance import ServiceInstance
class BaseBackend:
......@@ -34,8 +33,17 @@ class BaseBackend:
"""Performs a clean shutdown of the resources used by Swarm backend. Any threads that where started in the init() method should be terminated here. This method will be called when Zoe shuts down."""
raise NotImplementedError
def spawn_service(self, execution: Execution, service: Service, env_subst_dict: Dict):
"""Create a container for a service."""
def spawn_service(self, service_instance: ServiceInstance):
"""Create a container for a service.
The backend translates all the configuration parameters given in the ServiceInstance object into backend-specific container options and starts the container.
This function should either:
* raise ``ZoeStartExecutionRetryException`` in case a temporary error is generated
* raise ``ZoeStartExecutionFatalException`` in case a fatal error is generated
* return a backend-specific ID that will be used later by Zoe to interact with the running container
"""
raise NotImplementedError
def terminate_service(self, service: Service) -> None:
......
......@@ -21,6 +21,8 @@ from zoe_lib.config import get_conf
from zoe_lib.state import Service, Execution
from zoe_master.backends.proxy import gen_proxypath, JUPYTER_NOTEBOOK, MONGO_EXPRESS, JUPYTER_PORT, MONGO_PORT
from zoe_master.exceptions import ZoeStartExecutionFatalException
from zoe_master.workspace.filesystem import ZoeFSWorkspace
def gen_environment(execution: Execution, service: Service, env_subst_dict: Dict):
""" Generate a dictionary containing the current cluster status (before the new container is spawned)
......@@ -36,6 +38,7 @@ def gen_environment(execution: Execution, service: Service, env_subst_dict: Dict
raise ZoeStartExecutionFatalException("Service {} has wrong environment expression")
env_list.append((env_name, env_value))
# FIXME this code needs to be removed/changed to be generic
#if 'jupyter' in service.image_name:
env_list.append((JUPYTER_NOTEBOOK, gen_proxypath(execution, service) + '/' + JUPYTER_PORT))
#elif 'mongo-express' in service.image_name:
......@@ -47,4 +50,30 @@ def gen_environment(execution: Execution, service: Service, env_subst_dict: Dict
env_list.append(('SERVICE_NAME', service.name))
env_list.append(('PROXY_PATH', get_conf().proxy_path))
fswk = ZoeFSWorkspace()
env_list.append(('ZOE_WORKSPACE', fswk.get_mountpoint()))
return env_list
def gen_volumes(service_: Service, execution: Execution):
"""Return the list of default volumes to be added to all containers."""
vol_list = []
fswk = ZoeFSWorkspace()
wk_vol = fswk.get(execution.user_id)
vol_list.append(wk_vol)
return vol_list
def gen_labels(service: Service, execution: Execution):
"""Generate container labels, useful for identifying containers in monitoring systems."""
return {
'zoe_execution_name': execution.name,
'zoe_execution_id': str(execution.id),
'zoe_service_name': service.name,
'zoe_service_id': str(service.id),
'zoe_owner': execution.user_id,
'zoe_deployment_name': get_conf().deployment_name,
'zoe_type': 'app_service'
}
......@@ -22,9 +22,23 @@ from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
from zoe_master.backends.kubernetes.backend import KubernetesBackend
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
try:
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
except ImportError as ex:
OldSwarmBackend = None
try:
from zoe_master.backends.swarm.backend import SwarmBackend
except ImportError as ex:
SwarmBackend = None
try:
from zoe_master.backends.kubernetes.backend import KubernetesBackend
except ImportError:
KubernetesBackend = None
log = logging.getLogger(__name__)
......@@ -33,9 +47,17 @@ def _get_backend() -> BaseBackend:
"""Return the right backend instance by reading the global configuration."""
backend_name = get_conf().backend
if backend_name == 'OldSwarm':
if OldSwarmBackend is None:
raise ZoeException('The OldSwarm backend requires docker python version < 2')
return OldSwarmBackend(get_conf())
elif backend_name == 'Kubernetes':
if KubernetesBackend is None:
raise ZoeException('The Kubernetes backend requires the pykube module')
return KubernetesBackend(get_conf())
elif backend_name == 'Swarm':
if SwarmBackend is None:
raise ZoeException('The Swarm backend requires docker python version >= 2.0.2')
return SwarmBackend(get_conf())
else:
log.error('Unknown backend selected')
assert False
......@@ -72,10 +94,12 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
for service in ordered_service_list:
env_subst_dict['dns_name#self'] = service.dns_name
service.set_starting()
instance = ServiceInstance(execution, service, env_subst_dict)
try:
backend.spawn_service(execution, service, env_subst_dict)
backend_id, ip_address = backend.spawn_service(instance)
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
service.set_error(ex.message)
execution.set_error_message(ex.message)
terminate_execution(execution)
execution.set_scheduled()
......@@ -94,7 +118,8 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
execution.set_error()
return "fatal"
else:
execution.set_running()
log.debug('Service {} started'.format(instance.name))
service.set_active(backend_id, ip_address)
return "ok"
......
......@@ -16,15 +16,11 @@
"""Zoe backend implementation for Kubernetes with docker."""
import logging
from typing import Dict
from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_lib.state import Execution, Service
from zoe_master.backends.kubernetes.api_client import DockerContainerOptions, KubernetesClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException
from zoe_master.workspace.filesystem import ZoeFSWorkspace
import zoe_master.backends.common
from zoe_lib.state import Service
from zoe_master.backends.kubernetes.api_client import KubernetesClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
from zoe_master.backends.service_instance import ServiceInstance
import zoe_master.backends.base
from zoe_master.backends.kubernetes.threads import KubernetesMonitor, KubernetesStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
......@@ -55,74 +51,17 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
_monitor.quit()
_checker.quit()
def spawn_service(self, execution: Execution, service: Service, env_subst_dict: Dict):
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
copts = DockerContainerOptions()
copts.gelf_log_address = get_conf().gelf_address
copts.name = service.dns_name
copts.set_memory_limit(service.resource_reservation.memory)
copts.set_cores_limit(service.resource_reservation.cores)
copts.network_name = get_conf().overlay_network_name
copts.labels = {
'zoe.execution.name': execution.name,
'zoe.execution.id': str(execution.id),
'zoe.service.name': service.name,
'zoe.service.id': str(service.id),
'zoe.owner': execution.user_id,
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service.is_monitor:
copts.labels['kubernetes.monitor'] = 'true'
else:
copts.labels['kubernetes.monitor'] = 'false'
# Always disable autorestart
# if 'disable_autorestart' in execution.description and execution.description['disable_autorestart']:
# log.debug("Autorestart disabled for service {}".format(service.id))
# copts.restart = False
# else:
# copts.restart = not service.is_monitor # Monitor containers should not restart
copts.restart = False
env_vars = zoe_master.backends.common.gen_environment(execution, service, env_subst_dict)
for name, value in env_vars:
copts.add_env_variable(name, value)
for port in service.ports:
if port.expose:
copts.ports.append(port.number)
for volume in service.volumes:
if volume.type == "host_directory":
copts.add_volume_bind(volume.path, volume.mount_point, volume.readonly)
else:
log.warning('Kubernetes backend does not support volume type {}'.format(volume.type))
# if 'constraints' in service.description:
# for constraint in service.description['constraints']:
# copts.add_constraint(constraint)
fswk = ZoeFSWorkspace()
if fswk.can_be_attached():
copts.add_volume_bind(fswk.get_path(execution.user_id), fswk.get_mountpoint(), False)
copts.add_env_variable('ZOE_WORKSPACE', fswk.get_mountpoint())
# The same dictionary is used for templates in the command
copts.set_command(service.command.format(**env_subst_dict))
copts.set_replicas(service.replicas)
try:
self.kube.spawn_service(copts)
rc_info = self.kube.spawn_replication_controller(service.description['docker_image'], copts)
self.kube.spawn_service(service_instance)
rc_info = self.kube.spawn_replication_controller(service_instance)
except ZoeNotEnoughResourcesException:
service.set_error('Not enough free resources to satisfy reservation request')
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service.name))
except (ZoeException, ZoeLibException) as e:
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e))
service.set_active(rc_info["backend_id"], rc_info['ip_address'])
return rc_info["backend_id"], rc_info['ip_address']
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
......
......@@ -22,8 +22,10 @@ import time
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.kubernetes.api_client import KubernetesClient
log = logging.getLogger(__name__)
class KubernetesMonitor(threading.Thread):
"""The monitor."""
......@@ -40,7 +42,7 @@ class KubernetesMonitor(threading.Thread):
def run(self):
"""An infinite loop that listens for events from Kubernetes."""
log.info("Monitor thread started")
while True: # pylint: disable=too-many-nested-blocks
while True: # pylint: disable=too-many-nested-blocks
for event in self.kube.replication_controller_event():
log.debug('%s: %s', event.object.name, event.type)
if event.type != 'DELETED' and event.type != 'ADDED':
......@@ -79,8 +81,10 @@ class KubernetesMonitor(threading.Thread):
"""Stops the thread."""
self.stop = True
CHECK_INTERVAL = 300
class KubernetesStateSynchronizer(threading.Thread):
"""The Kubernetes Checker."""
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""When a service from the application description needs to be instantiated, it is transformed into a ServiceInstance, an internal representation of a generic container. This class is used to gather all the attributes that describe a container and to provide a clear interface with the backend."""
from zoe_lib.state import Service, Execution
from zoe_lib.config import get_conf
import zoe_master.backends.common
class ServiceInstance:
"""The ServiceInstance class, a Service that is going to be instantiated into a container."""
def __init__(self, execution: Execution, service: Service, env_subst_dict):
self.name = service.unique_name
self.hostname = service.dns_name
self.memory_limit = service.resource_reservation.memory
self.core_limit = service.resource_reservation.cores
self.labels = {
'zoe.execution.name': execution.name,
'zoe.execution.id': str(execution.id),
'zoe.service.name': service.name,
'zoe.service.id': str(service.id),
'zoe.owner': execution.user_id,
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service.is_monitor:
self.labels['zoe_monitor'] = 'true'
else:
self.labels['zoe_monitor'] = 'false'
self.labels = zoe_master.backends.common.gen_labels(service, execution)
self.environment = service.environment + zoe_master.backends.common.gen_environment(execution, service, env_subst_dict)
self.volumes = service.volumes + zoe_master.backends.common.gen_volumes(service, execution)
self.command = service.command
self.image_name = service.image_name
self.ports = service.ports
self.replicas_count = service.replicas
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to the low-level Docker API."""
import time
import logging
from typing import Iterable, Callable, Dict, Any
import humanfriendly
try:
from consul import Consul
except ImportError:
Consul = None
try:
from kazoo.client import KazooClient
except ImportError:
KazooClient = None
import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers
import requests.packages
from zoe_lib.config import get_conf
from zoe_lib.state import Service
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException
log = logging.getLogger(__name__)
try:
docker.DockerClient()
except AttributeError:
log.error('Docker package does not have the DockerClient attribute')
raise ImportError('Wrong Docker library version')
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
"""
Given a Zookeeper server list, find the currently active Swarm master.
:param zk_server_list: Zookeeper server list
:param path: Swarm path in Zookeeper
:return: Swarm master connection string
"""
path += '/docker/swarm/leader'
zk_client = KazooClient(hosts=zk_server_list)
zk_client.start()
master, stat_ = zk_client.get(path)
zk_client.stop()
return master.decode('utf-8')
def consul_swarm(consul_ip: str) -> str:
"""
Using consul as discovery service, find the currently active Swarm master.
:param consul_ip: consul ip address
:return: Swarm master connection string
"""
leader_key = 'docker/swarm/leader'
consul_client = Consul(consul_ip)
key_val = consul_client.kv.get(leader_key)
master = key_val[1]['Value']
return master.decode('utf-8')
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self) -> None:
url = get_conf().backend_swarm_url
tls = False
if 'zk://' in url:
if KazooClient is None:
raise ZoeException('ZooKeeper URL for Swarm, but the kazoo package is not installed')
url = url[len('zk://'):]
manager = zookeeper_swarm(url, get_conf().backend_swarm_zk_path)
elif 'consul://' in url:
if Consul is None:
raise ZoeException('Consul URL for Swarm, but the consul package is not installed')
url = url[len('consul://'):]
manager = consul_swarm(url)
elif 'http://' in url:
manager = url
elif 'https://' in url:
tls = docker.tls.TLSConfig(client_cert=(get_conf().backend_swarm_tls_cert, get_conf().backend_swarm_tls_key), verify=get_conf().backend_swarm_tls_ca)
manager = url
else:
raise ZoeException('Unsupported URL scheme for Swarm')
self.cli = docker.DockerClient(base_url=manager, version="auto", tls=tls)
def info(self) -> ClusterStats:
"""Retrieve Swarm statistics. The Docker API returns a mess difficult to parse."""
info = self.cli.info()
pl_status = ClusterStats()
pl_status.container_count = info["Containers"]
pl_status.memory_total = info["MemTotal"]
pl_status.cores_total = info["NCPU"]
# SystemStatus is a list...
idx = 0 # Role, skip
idx += 1
assert 'Strategy' in info["SystemStatus"][idx][0]
pl_status.placement_strategy = info["SystemStatus"][idx][1]
idx += 1
assert 'Filters' in info["SystemStatus"][idx][0]
pl_status.active_filters = [x.strip() for x in info["SystemStatus"][idx][1].split(", ")]
idx += 1
assert 'Nodes' in info["SystemStatus"][idx][0]
node_count = int(info["SystemStatus"][idx][1])
idx += 1 # At index 4 the nodes begin
for node in range(node_count):
idx2 = 0
node_stats = NodeStats(info["SystemStatus"][idx + node][0].strip())
node_stats.docker_endpoint = info["SystemStatus"][idx + node][1]
idx2 += 1 # ID, skip
idx2 += 1 # Status
node_stats.status = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Containers
node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
idx2 += 1 # CPUs
node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1 # Memory
node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1 # Labels
node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
idx2 += 1 # Last update
node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Docker version
node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]
node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)
pl_status.nodes.append(node_stats)
idx += idx2
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
"""Create and start a new container."""
cont = None
port_bindings = {} # type: Dict[str, Any]
for port in service_instance.ports:
if port.expose:
port_bindings[str(port.number) + '/tcp'] = None
if get_conf().gelf_address != '':