Commit 93d35c2e authored by Daniele Venzano's avatar Daniele Venzano

Remove Swarm backend

parent f51f24f6
......@@ -23,7 +23,6 @@ At this time Zoe supports three back-ends:
* DockerEngine: uses one or more Docker Engines. It is simple to install and to scale.
* Kubernetes: the most complex to setup, we suggest using it only if you already have (or need) a Kubernetes setup for running other software.
* Legacy Docker Swarm (deprecated): simple to install, additional features like SSL, high-availability and dynamic host discovery can be added as needed. Please note that Zoe does not support the new Swarm Mode of Docker Engine as the API is too limited.
DockerEngine
^^^^^^^^^^^^
......@@ -309,7 +308,7 @@ A Docker Registry becomes interesting to have if you have lot of image build act
Zoe
^^^
Zoe is written in Python and uses the ``requirements.txt`` file to list the package dependencies needed for all components of Zoe. Not all of them are needed in all cases, for example you need the ``kazoo`` library only if you use Zookeeper to manage Swarm high availability.
Zoe is written in Python and uses the ``requirements.txt`` file to list the package dependencies needed for all components of Zoe. Not all of them are needed in all cases, for example you need the ``pykube`` library only if you use the Kubernetes back-end.
Currently this is the recommended procedure, once the initial Swarm setup has been done:
......
......@@ -2,7 +2,6 @@ Jinja2>=2.8
requests>=2.9.1
docker>=2.1.0
tornado>=4.3
kazoo>=2.2.1
humanfriendly
psycopg2>=2.6.1
pyzmq>=15.2.0
......
#!/usr/bin/python3
"""
Find the Swarm manager by querying ZooKeeper.
"""
import sys
from kazoo.client import KazooClient
def zookeeper_swarm(zk_server_list, path='/swarm'):
"""Query ZooKeeper."""
path += '/docker/swarm/leader'
zk = KazooClient(hosts=zk_server_list)
zk.start()
master, stat_ = zk.get(path)
zk.stop()
return master.decode('utf-8')
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Provide zookeeper server list")
print(zookeeper_swarm(sys.argv[1]))
......@@ -19,7 +19,6 @@ import logging
from zoe_lib.configargparse import ArgumentParser, Namespace
logging.getLogger('kazoo').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('docker').setLevel(logging.INFO)
......@@ -101,14 +100,7 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--backend-swarm-zk-path', help='Swarm/Docker optional ZooKeeper path for Swarm Znodes', default='/docker')
argparser.add_argument('--backend-swarm-tls-cert', help='Docker TLS certificate file', default='cert.pem')
argparser.add_argument('--backend-swarm-tls-key', help='Docker TLS private key file', default='key.pem')
argparser.add_argument('--backend-swarm-tls-ca', help='Docker TLS CA certificate file', default='ca.pem')
argparser.add_argument('--backend', choices=['Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
# Docker Engine backend options
argparser.add_argument('--backend-docker-config-file', help='Location of the Docker Engine config file', default='docker.conf')
......
......@@ -27,11 +27,6 @@ from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
from zoe_master.stats import ClusterStats # pylint: disable=unused-import
try:
from zoe_master.backends.swarm.backend import SwarmBackend
except ImportError as ex:
SwarmBackend = None
try:
from zoe_master.backends.kubernetes.backend import KubernetesBackend
except ImportError:
......@@ -53,10 +48,6 @@ def _get_backend() -> Union[BaseBackend, None]:
if KubernetesBackend is None:
raise ZoeException('The Kubernetes backend requires the pykube module')
return KubernetesBackend(get_conf())
elif backend_name == 'Swarm':
if SwarmBackend is None:
raise ZoeException('The Swarm backend requires docker python version >= 2.0.2')
return SwarmBackend(get_conf())
elif backend_name == 'DockerEngine':
if DockerEngineBackend is None:
raise ZoeException('The Docker Engine backend requires docker python version >= 2.0.2')
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to the low-level Docker API."""
import time
import logging
from typing import Iterable, Callable, Dict, Any
import humanfriendly
try:
from consul import Consul
except ImportError:
Consul = None
try:
from kazoo.client import KazooClient
except ImportError:
KazooClient = None
import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers
import requests.exceptions
from zoe_lib.config import get_conf
from zoe_lib.state import Service, VolumeDescriptionHostPath
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException
log = logging.getLogger(__name__)
try:
docker.DockerClient()
except AttributeError:
log.error('Docker package does not have the DockerClient attribute')
raise ImportError('Wrong Docker library version')
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
"""
Given a Zookeeper server list, find the currently active Swarm master.
:param zk_server_list: Zookeeper server list
:param path: Swarm path in Zookeeper
:return: Swarm master connection string
"""
path += '/docker/swarm/leader'
zk_client = KazooClient(hosts=zk_server_list)
zk_client.start()
master, stat_ = zk_client.get(path)
zk_client.stop()
return master.decode('utf-8')
def consul_swarm(consul_ip: str) -> str:
"""
Using consul as discovery service, find the currently active Swarm master.
:param consul_ip: consul ip address
:return: Swarm master connection string
"""
leader_key = 'docker/swarm/leader'
consul_client = Consul(consul_ip)
key_val = consul_client.kv.get(leader_key)
master = key_val[1]['Value']
return master.decode('utf-8')
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self) -> None:
url = get_conf().backend_swarm_url
tls = False
if 'zk://' in url:
if KazooClient is None:
raise ZoeException('ZooKeeper URL for Swarm, but the kazoo package is not installed')
url = url[len('zk://'):]
manager = zookeeper_swarm(url, get_conf().backend_swarm_zk_path)
elif 'consul://' in url:
if Consul is None:
raise ZoeException('Consul URL for Swarm, but the consul package is not installed')
url = url[len('consul://'):]
manager = consul_swarm(url)
elif 'http://' in url:
manager = url
elif 'https://' in url:
tls = docker.tls.TLSConfig(client_cert=(get_conf().backend_swarm_tls_cert, get_conf().backend_swarm_tls_key), verify=get_conf().backend_swarm_tls_ca)
manager = url
else:
raise ZoeException('Unsupported URL scheme for Swarm')
try:
self.cli = docker.DockerClient(base_url=manager, version="auto", tls=tls)
except docker.errors.DockerException:
raise ZoeException("Cannot connect to Docker")
def info(self) -> ClusterStats:
"""Retrieve Swarm statistics. The Docker API returns a mess difficult to parse."""
info = self.cli.info()
pl_status = ClusterStats()
# SystemStatus is a list...
idx = 0 # Role, skip
idx += 1
assert 'Strategy' in info["SystemStatus"][idx][0]
pl_status.placement_strategy = info["SystemStatus"][idx][1]
idx += 1
assert 'Filters' in info["SystemStatus"][idx][0]
pl_status.active_filters = [x.strip() for x in info["SystemStatus"][idx][1].split(", ")]
idx += 1
assert 'Nodes' in info["SystemStatus"][idx][0]
node_count = int(info["SystemStatus"][idx][1])
idx += 1 # At index 4 the nodes begin
for node in range(node_count):
idx2 = 0
node_stats = NodeStats(info["SystemStatus"][idx + node][0].strip())
node_stats.docker_endpoint = info["SystemStatus"][idx + node][1]
idx2 += 1 # ID, skip
idx2 += 1 # Status
if info["SystemStatus"][idx + node + idx2][1] == 'Healthy':
node_stats.status = 'online'
else:
node_stats.status = 'offline'
idx2 += 1 # Containers
node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
idx2 += 1 # CPUs
node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1 # Memory
node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1 # Labels
node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
idx2 += 1 # Last update
node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Docker version
node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]
node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)
pl_status.nodes.append(node_stats)
idx += idx2
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
"""Create and start a new container."""
cont = None
port_bindings = {} # type: Dict[str, Any]
for port in service_instance.ports:
port_bindings[str(port.number) + '/' + port.proto] = None
environment = {}
for name, value in service_instance.environment:
environment[name] = value
volumes = {}
for volume in service_instance.volumes:
if volume.type == "host_directory":
assert isinstance(volume, VolumeDescriptionHostPath)
volumes[volume.path] = {'bind': volume.mount_point, 'mode': ("ro" if volume.readonly else "rw")}
else:
log.error('Swarm backend does not support volume type {}'.format(volume.type))
if service_instance.memory_limit is not None:
mem_limit = service_instance.memory_limit.max
else:
mem_limit = 0
# Swarm backend does not support cores in a consistent way, see https://github.com/docker/swarm/issues/475
if get_conf().gelf_address != '':
log_config = {
"type": "gelf",
"config": {
'gelf-address': get_conf().gelf_address,
'labels': ",".join(service_instance.labels)
}
}
else:
log_config = {
"type": "json-file",
"config": {}
}
try:
cont = self.cli.containers.run(image=service_instance.image_name,
command=service_instance.command,
detach=True,
environment=environment,
hostname=service_instance.hostname,
labels=service_instance.labels,
log_config=log_config,
mem_limit=mem_limit,
memswap_limit=0,
name=service_instance.name,
network_disabled=False,
network_mode=get_conf().overlay_network_name,
ports=port_bindings,
working_dir=service_instance.work_dir,
volumes=volumes)
except docker.errors.ImageNotFound:
raise ZoeException(message='Image not found')
except docker.errors.APIError as e:
if cont is not None:
cont.remove(force=True)
if e.explanation == b'no resources available to schedule container':
raise ZoeNotEnoughResourcesException(message=str(e))
else:
raise ZoeException(message=str(e))
except Exception as e:
if cont is not None:
cont.remove(force=True)
raise ZoeException(str(e))
cont = self.cli.containers.get(cont.id)
return self._container_summary(cont)
def _container_summary(self, container: docker.models.containers.Container):
"""Translate a docker-specific container object into a simple dictionary."""
info = {
"id": container.id,
"ip_address": {},
"name": container.name,
'labels': container.attrs['Config']['Labels']
} # type: Dict[str, Any]
try:
info['host'] = container.attrs['Node']['Name']
except KeyError:
info['host'] = 'N/A'
if container.status == 'running' or container.status == 'restarting':
info["state"] = Service.BACKEND_START_STATUS
info["running"] = True
elif container.status == 'paused' or container.status == 'exited':
info["state"] = Service.BACKEND_DIE_STATUS
info["running"] = False
elif container.status == 'OOMKilled':
info["state"] = Service.BACKEND_OOM_STATUS
info["running"] = False
elif container.status == 'created':
info["state"] = Service.BACKEND_CREATE_STATUS
info["running"] = False
else:
log.error('Unknown container status: {}'.format(container.status))
info["state"] = Service.BACKEND_UNDEFINED_STATUS
info["running"] = False
info['ports'] = {}
if container.attrs['NetworkSettings']['Ports'] is not None:
for port in container.attrs['NetworkSettings']['Ports']:
if container.attrs['NetworkSettings']['Ports'][port] is not None:
mapping = (
container.attrs['NetworkSettings']['Ports'][port][0]['HostIp'],
container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
)
info['ports'][port] = mapping
else:
info['ports'][port] = None
return info
def inspect_container(self, docker_id: str) -> Dict[str, Any]:
"""Retrieve information about a running container."""
try:
cont = self.cli.container.get(docker_id)
except Exception as e:
raise ZoeException(str(e))
return self._container_summary(cont)
def terminate_container(self, docker_id: str, delete=False) -> None:
"""
Terminate a container.
:param docker_id: The container to terminate
:type docker_id: str
:param delete: If True, also delete the container files
:type delete: bool
:return: None
"""
try:
cont = self.cli.containers.get(docker_id)
except docker.errors.NotFound:
return
cont.stop(timeout=5)
if delete:
try:
cont.remove(force=True)
except docker.errors.APIError as e:
log.warning(str(e))
def event_listener(self, callback: Callable[[str], bool]) -> None:
"""An infinite loop that listens for events from Swarm."""
event_gen = self.cli.events(decode=True)
while True:
try:
event = next(event_gen)
except requests.exceptions.RequestException:
log.warning('Docker closed event connection, retrying...')
event_gen = self.cli.events(decode=True)
continue
try:
res = callback(event)
except Exception:
log.exception('Uncaught exception in swarm event callback')
log.warning('event was: {}'.format(event))
continue
if not res:
break
def list(self, only_label=None) -> Iterable[dict]:
"""
List running or defined containers.
:param only_label: filter containers with only a certain label
:return: a list of containers
"""
try:
ret = self.cli.containers.list(all=True)
except docker.errors.APIError as ex:
raise ZoeException(str(ex))
except requests.exceptions.RequestException as ex:
raise ZoeException(str(ex))
conts = []
for cont_info in ret:
match = True
for key, value in only_label.items():
if key not in cont_info.attrs['Config']['Labels']:
match = False
break
if cont_info.attrs['Config']['Labels'][key] != value:
match = False
break
if match:
conts.append(self._container_summary(cont_info))
return conts
def logs(self, docker_id: str, stream: bool, follow=None):
"""
Retrieves the logs of the selected container.
:param docker_id:
:param stream:
:param follow:
:return:
"""
try:
cont = self.cli.containers.get(docker_id)
except (docker.errors.NotFound, docker.errors.APIError):
return None
try:
return cont.logs(docker_id, stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
except docker.errors.APIError:
return None
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe back-end implementation for old-style stand-alone Docker Swarm."""
import logging
from zoe_lib.state import Service
from zoe_master.backends.swarm.api_client import SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.swarm.threads import SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__)
# These two module-level variables hold the references to the monitor and checker threads
_checker = None
class SwarmBackend(zoe_master.backends.base.BaseBackend):
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
def __init__(self, opts):
super().__init__(opts)
self.swarm = SwarmClient()
log.warning('The Docker Swarm back-end is deprecated and will be removed in 2018.03, please upgrade to the Docker Engine back-end.')
@classmethod
def init(cls, state):
"""Initializes Swarm backend starting the event monitoring thread."""
global _checker
_checker = SwarmStateSynchronizer(state)
@classmethod
def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend."""
_checker.quit()
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
try:
cont_info = self.swarm.spawn_container(service_instance)
except ZoeNotEnoughResourcesException:
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e))
ip_address = None
ports = {}
for port_name, mapping in cont_info['ports'].items():
if mapping is None:
continue
ip_address = mapping[0]
ports[port_name] = mapping[1]
return cont_info["id"], ip_address, ports
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
self.swarm.terminate_container(service.backend_id, delete=True)
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
info = self.swarm.info()
for node in info.nodes: # type: NodeStats
node.memory_in_use = node.memory_reserved
node.cores_in_use = node.cores_reserved
return info
def service_log(self, service: Service):
"""Get the log."""
return self.swarm.logs(service.backend_id, True, False)
def preload_image(self, image_name: str) -> None:
"""Make a service image available."""
raise NotImplementedError
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
log.error('Reservation update not implemented in the Swarm back-end')
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Monitor for the Swarm event stream."""
import logging
import threading
import time
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.swarm.api_client import SwarmClient
from zoe_master.exceptions import ZoeException
log = logging.getLogger(__name__)
CHECK_INTERVAL = 10
class SwarmStateSynchronizer(threading.Thread):
"""The Swarm Checker."""
def __init__(self, state: SQLManager) -> None:
super().__init__()
self.setName('checker')
self.stop = False
self.state = state
self.setDaemon(True)
self.start()
def _update_service_status(self, service: Service, container):
"""Update the service status."""
if service.backend_status != container['state']:
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
def run(self):
"""The thread loop."""
log.info("Checker thread started")
while not self.stop:
try:
swarm = SwarmClient()
except ZoeException as e:
log.error(str(e))
time.sleep(CHECK_INTERVAL)
continue
service_list = self.state.services.select()
try:
container_list = swarm.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
except ZoeException:
continue
containers = {}
for cont in container_list:
containers[cont['id']] = cont
services = {}
for serv in service_list:
services[serv.backend_id] = serv
for service in service_list:
assert isinstance(service, Service)
if service.backend_id in containers:
self._update_service_status(service, containers[service.backend_id])
else:
if service.backend_status == service.BACKEND_DESTROY_STATUS:
continue
else:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
time.sleep(CHECK_INTERVAL)
def quit(self):
"""Stops the thread."""
self.stop = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment