Commit 43b1eb39 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

New backend based on the new Python Docker library

parent e331f298
......@@ -83,7 +83,7 @@ class APIEndpoint:
if e.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if e.is_active():
if e.is_active:
return self.master.execution_terminate(exec_id)
else:
raise zoe_api.exceptions.ZoeException('Execution is not running')
......@@ -98,7 +98,7 @@ class APIEndpoint:
if e.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if e.is_active():
if e.is_active:
raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
status, message = self.master.execution_delete(exec_id)
......@@ -144,7 +144,7 @@ class APIEndpoint:
log.debug('Starting dead execution cleanup task')
all_execs = self.sql.execution_list()
for execution in all_execs:
if execution.is_running():
if execution.is_running:
for service in execution.services:
if service.description['monitor'] and service.is_dead():
log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
......
......@@ -32,11 +32,11 @@
<li class="container_name" id="{{ s['id'] }}">{{ s['name'] }}</li>
<ul>
<li>Zoe status: {{ s['status'] }}</li>
<li>Docker status: {{ s['docker_status'] }}</li>
<li>Docker status: {{ s['backend_status'] }}</li>
{% if s['error_message'] is not none %}
<li>Error: {{ s['error_message'] }}</li>
{% endif %}
{% if s['docker_status'] == 'started' %}
{% if s['backend_status'] == 'started' %}
{% for p in s['description']['ports'] %}
<li><a href="{{ p['protocol'] }}://{{ s['ip_address'] }}:{{ p['port_number'] }}{{ p['path'] }}">{{ p['name'] }}</a></li>
{% endfor %}
......
......@@ -49,7 +49,6 @@ def load_configuration(test_conf=None):
# Common options
argparser.add_argument('--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--swarm', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--deployment-name', help='name of this Zoe deployment', default='prod')
argparser.add_argument('--dbname', help='DB name', default='zoe')
......@@ -87,7 +86,11 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['OldSwarm'], default='OldSwarm')
argparser.add_argument('--backend', choices=['OldSwarm', 'OldSwarmNewAPI'], default='OldSwarmNewAPI')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--backend-swarm-zk-path', help='Swarm/Docker optional ZooKeeper path for Swarm Znodes', default='/docker')
argparser.add_argument('--cookie-secret', help='secret used to encrypt cookies', default='changeme')
......
......@@ -23,6 +23,7 @@ from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
from zoe_master.backends.old_swarm_new_api.backend import OldSwarmNewAPIBackend
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
log = logging.getLogger(__name__)
......@@ -33,6 +34,8 @@ def _get_backend() -> BaseBackend:
backend_name = get_conf().backend
if backend_name == 'OldSwarm':
return OldSwarmBackend(get_conf())
elif backend_name == 'OldSwarmNewAPI':
return OldSwarmNewAPIBackend(get_conf())
else:
log.error('Unknown backend selected')
assert False
......@@ -92,7 +95,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
return "fatal"
else:
execution.set_running()
return "ok"
return "ok"
def start_all(execution: Execution) -> str:
......
......@@ -128,11 +128,11 @@ def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
zk_client.stop()
return master.decode('utf-8')
def consul_swarm(consul_ip: str, path='/docker') -> str:
def consul_swarm(consul_ip: str) -> str:
"""
Using consul as discovery service, find the currently active Swarm master.
:param consul_ip: consul ip address
:param path: Swarm path in Consul
:return: Swarm master connection string
"""
leader_key = 'docker/swarm/leader'
......@@ -141,6 +141,7 @@ def consul_swarm(consul_ip: str, path='/docker') -> str:
master = key_val[1]['Value']
return master.decode('utf-8')
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self, opts: Namespace) -> None:
......@@ -148,7 +149,7 @@ class SwarmClient:
url = opts.swarm
if 'zk://' in url:
url = url[len('zk://'):]
manager = zookeeper_swarm(url)
manager = zookeeper_swarm(url, opts.backend_swarm_zk_path)
elif 'consul://' in url:
url = url[len('consul://'):]
manager = consul_swarm(url)
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to the low-level Docker API."""
from argparse import Namespace
import time
import logging
from typing import Iterable, Callable, Dict, Any, Union
import humanfriendly
try:
from consul import Consul
except ImportError:
Consul = None
try:
from kazoo.client import KazooClient
except ImportError:
KazooClient = None
try:
import docker
import docker.errors
import docker.utils
except ImportError:
pass
import requests.packages
from zoe_master.stats import ClusterStats, NodeStats
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
log = logging.getLogger(__name__)
class DockerContainerOptions:
"""Wrapper for the Docker container options."""
def __init__(self):
self.env = {}
self.volumes = {}
self.command = ""
self.memory_limit = 2 * (1024**3)
self.name = ''
self.ports = []
self.network_name = 'bridge'
self.restart = True
self.labels = []
self.gelf_log_address = ''
self.constraints = []
def add_constraint(self, constraint):
"""Add a placement constraint (use docker syntax)."""
self.constraints.append(constraint)
def add_env_variable(self, name: str, value: Union[str, None]) -> None:
"""Add an environment variable to the container definition."""
self.env[name] = value
@property
def environment(self) -> Dict[str, Union[str, None]]:
"""Access the environment variables."""
return self.env
def add_volume_bind(self, path: str, mountpoint: str, readonly=False) -> None:
"""Add a volume to the container."""
self.volumes[path] = {'bind': mountpoint, 'mode': ("ro" if readonly else "rw")}
def get_volumes(self) -> Iterable[str]:
"""Get the volumes in Docker format."""
return self.volumes
def set_command(self, cmd):
"""Setter for the command to run in the container."""
self.command = cmd
def get_command(self) -> str:
"""Getter for the command to run in the container."""
return self.command
def set_memory_limit(self, limit: int):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
def get_memory_limit(self) -> int:
"""Getter for the memory limit of the container."""
return self.memory_limit
@property
def restart_policy(self) -> Dict[str, str]:
"""Getter for the restart policy of the container."""
if self.restart:
return {'Name': 'always'}
else:
return {}
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
"""
Given a Zookeeper server list, find the currently active Swarm master.
:param zk_server_list: Zookeeper server list
:param path: Swarm path in Zookeeper
:return: Swarm master connection string
"""
path += '/docker/swarm/leader'
zk_client = KazooClient(hosts=zk_server_list)
zk_client.start()
master, stat_ = zk_client.get(path)
zk_client.stop()
return master.decode('utf-8')
def consul_swarm(consul_ip: str) -> str:
"""
Using consul as discovery service, find the currently active Swarm master.
:param consul_ip: consul ip address
:return: Swarm master connection string
"""
leader_key = 'docker/swarm/leader'
consul_client = Consul(consul_ip)
key_val = consul_client.kv.get(leader_key)
master = key_val[1]['Value']
return master.decode('utf-8')
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self, opts: Namespace) -> None:
self.opts = opts
url = opts.backend_swarm_url
if 'zk://' in url:
url = url[len('zk://'):]
manager = zookeeper_swarm(url, opts.backend_swarm_zk_path)
elif 'consul://' in url:
url = url[len('consul://'):]
manager = consul_swarm(url)
elif 'http://' or 'https://' in url:
manager = url
else:
raise ZoeLibException('Unsupported URL scheme for Swarm')
log.debug('Connecting to Swarm at {}'.format(manager))
self.cli = docker.DockerClient(base_url=manager, version="auto")
def info(self) -> ClusterStats:
"""Retrieve Swarm statistics. The Docker API returns a mess difficult to parse."""
info = self.cli.info()
pl_status = ClusterStats()
pl_status.container_count = info["Containers"]
pl_status.memory_total = info["MemTotal"]
pl_status.cores_total = info["NCPU"]
# SystemStatus is a list...
idx = 0 # Role, skip
idx += 1
assert 'Strategy' in info["SystemStatus"][idx][0]
pl_status.placement_strategy = info["SystemStatus"][idx][1]
idx += 1
assert 'Filters' in info["SystemStatus"][idx][0]
pl_status.active_filters = [x.strip() for x in info["SystemStatus"][idx][1].split(", ")]
idx += 1
assert 'Nodes' in info["SystemStatus"][idx][0]
node_count = int(info["SystemStatus"][idx][1])
idx += 1 # At index 4 the nodes begin
for node in range(node_count):
idx2 = 0
node_stats = NodeStats(info["SystemStatus"][idx + node][0].strip())
node_stats.docker_endpoint = info["SystemStatus"][idx + node][1]
idx2 += 1 # ID, skip
idx2 += 1 # Status
node_stats.status = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Containers
node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
idx2 += 1 # CPUs
node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1 # Memory
node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1 # Labels
node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
idx2 += 1 # Last update
node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Docker version
node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]
node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)
pl_status.nodes.append(node_stats)
idx += idx2
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, image: str, options: DockerContainerOptions) -> Dict[str, Any]:
"""Create and start a new container."""
cont = None
port_bindings = {} # type: Dict[str, Any]
for port in options.ports:
port_bindings[str(port) + '/tcp'] = None
for constraint in options.constraints:
options.add_env_variable(constraint, None)
if options.gelf_log_address != '':
log_config = {
"type": "gelf",
"config": {
'gelf-address': options.gelf_log_address,
'labels': ",".join(options.labels)
}
}
else:
log_config = {
"type": "json-file",
"config": {}
}
try:
cont = self.cli.containers.run(image=image,
command=options.get_command(),
detach=True,
environment=options.environment,
hostname=options.name,
labels=options.labels,
log_config=log_config,
mem_limit=options.get_memory_limit(),
memswap_limit=options.get_memory_limit(),
name=options.name,
networks=[options.network_name],
network_disabled=False,
network_mode=options.network_name,
ports=port_bindings,
restart_policy=options.restart_policy,
volumes=options.get_volumes())
except docker.errors.ImageNotFound:
raise ZoeLibException(message='Image not found')
except docker.errors.APIError as e:
if cont is not None:
self.cli.remove_container(container=cont.get('Id'), force=True)
if e.explanation == b'no resources available to schedule container':
raise ZoeNotEnoughResourcesException(message=e.explanation.decode('utf-8'))
else:
raise ZoeLibException(message=e.explanation.decode('utf-8'))
except Exception as e:
if cont is not None:
self.cli.remove_container(container=cont.get('Id'), force=True)
raise ZoeLibException(str(e))
cont = self.cli.containers.get(cont.id)
return self._container_summary(cont)
def _container_summary(self, container: docker.models.containers.Container):
"""Translate a docker-specific container object into a simple dictionary."""
info = {
"id": container.id,
"ip_address": {},
"name": container.name,
'host': container.attrs['Node']['Name'],
'labels': container.attrs['Config']['Labels']
} # type: Dict[str, Any]
for net in container.attrs["NetworkSettings"]["Networks"]:
if len(container.attrs["NetworkSettings"]["Networks"][net]['IPAddress']) > 0:
info["ip_address"][net] = container.attrs["NetworkSettings"]["Networks"][net]['IPAddress']
else:
info["ip_address"][net] = None
if container.status == 'running':
info["state"] = "running"
info["running"] = True
elif container.status == "paused":
info["state"] = "paused"
info["running"] = False
elif container.status == 'restarting':
info["state"] = "restarting"
info["running"] = True
elif container.status == 'OOMKilled' or container.status == 'exited':
info["state"] = "killed"
info["running"] = False
else:
log.error('Unknown container status: {}'.format(container.status))
info["state"] = "unknown"
info["running"] = False
info['ports'] = {}
if container.attrs['NetworkSettings']['Ports'] is not None:
for port in container.attrs['NetworkSettings']['Ports']:
if container.attrs['NetworkSettings']['Ports'][port] is not None:
mapping = (
container.attrs['NetworkSettings']['Ports'][port][0]['HostIp'],
container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
)
info['ports'][port] = mapping
else:
info['ports'][port] = None
return info
def inspect_container(self, docker_id: str) -> Dict[str, Any]:
"""Retrieve information about a running container."""
try:
cont = self.cli.container.get(docker_id)
except Exception as e:
raise ZoeLibException(str(e))
return self._container_summary(cont)
def terminate_container(self, docker_id: str, delete=False) -> None:
"""
Terminate a container.
:param docker_id: The container to terminate
:type docker_id: str
:param delete: If True, also delete the container files
:type delete: bool
:return: None
"""
try:
cont = self.cli.containers.get(docker_id)
except docker.errors.NotFound:
return
cont.stop(timeout=5)
if delete:
cont.remove(force=True)
def event_listener(self, callback: Callable[[str], bool]) -> None:
"""An infinite loop that listens for events from Swarm."""
event_gen = self.cli.events(decode=True)
while True:
try:
event = next(event_gen)
except requests.packages.urllib3.exceptions.ProtocolError:
log.warning('Docker closed event connection, retrying...')
event_gen = self.cli.events(decode=True)
continue
try:
res = callback(event)
except Exception:
log.exception('Uncaught exception in swarm event callback')
log.warning('event was: {}'.format(event))
continue
if not res:
break
def connect_to_network(self, container_id: str, network_id: str) -> None:
"""Connect a container to a network."""
try:
net = self.cli.networks.get(network_id)
except docker.errors.NotFound:
log.error('Trying to connect to a non-existent network')
return
net.connect(container_id)
def disconnect_from_network(self, container_id: str, network_id: str) -> None:
"""Disconnects a container from a network."""
try:
net = self.cli.networks.get(network_id)
except docker.errors.NotFound:
log.error('Trying to connect to a non-existent network')
return
net.disconnect(container_id)
def list(self, only_label=None) -> Iterable[dict]:
"""
List running or defined containers.
:param only_label: filter containers with only a certain label
:return: a list of containers
"""
ret = self.cli.containers.list(all=True)
conts = []
for cont_info in ret:
match = True
for key, value in only_label.items():
if key not in cont_info.attrs['Config']['Labels']:
match = False
break
if cont_info.attrs['Config']['Labels'][key] != value:
match = False
break
if match:
conts.append(self._container_summary(cont_info))
return conts
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
import logging
from typing import Dict
from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_lib.state import Execution, Service
from zoe_master.backends.old_swarm.api_client import DockerContainerOptions, SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException
from zoe_master.workspace.filesystem import ZoeFSWorkspace
import zoe_master.backends.common
import zoe_master.backends.base
from zoe_master.backends.old_swarm.threads import SwarmMonitor, SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__)
# These two module-level variables hold the references to the monitor and checker threads
_monitor = None
_checker = None
class OldSwarmNewAPIBackend(zoe_master.backends.base.BaseBackend):
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
def __init__(self, opts):
super().__init__(opts)
self.swarm = SwarmClient(opts)
@classmethod
def init(cls, state):
"""Initializes Swarm backend starting the event monitoring thread."""
global _monitor, _checker
_monitor = SwarmMonitor(state)
_checker = SwarmStateSynchronizer(state)
@classmethod
def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend."""
_monitor.quit()
_checker.quit()
def spawn_service(self, execution: Execution, service: Service, env_subst_dict: Dict):
"""Spawn a service, translating a Zoe Service into a Docker container."""
copts = DockerContainerOptions()