Commit ddf1f8a3 authored by Daniele Venzano's avatar Daniele Venzano

Implement a new backend that communicates directly with Docker engines

parent 9be889df
......@@ -61,3 +61,4 @@ target/
state.zoe
/zoe*.conf
zoepass.csv
/docker.conf
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 4 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......@@ -75,6 +75,7 @@ def create_tables(cur):
name TEXT NOT NULL,
backend_id TEXT NULL DEFAULT NULL,
backend_status TEXT NOT NULL DEFAULT 'undefined',
backend_host TEXT NULL DEFAULT NULL,
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
......
......@@ -95,7 +95,7 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes'], default='Swarm')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='Swarm')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
......@@ -104,6 +104,9 @@ def load_configuration(test_conf=None):
argparser.add_argument('--backend-swarm-tls-key', help='Docker TLS private key file', default='key.pem')
argparser.add_argument('--backend-swarm-tls-ca', help='Docker TLS CA certificate file', default='ca.pem')
# Docker Engine backend options
argparser.add_argument('--backend-docker-config-file', help='Location of the Docker Engine config file', default='docker.conf')
# Kubernetes backend
argparser.add_argument('--kube-config-file', help='Kubernetes configuration file', default='/opt/zoe/kube.conf')
argparser.add_argument('--kube-namespace', help='The namespace that Zoe operates on', default='default')
......
......@@ -115,6 +115,7 @@ class Service:
self.service_group = d['service_group']
self.backend_id = d['backend_id']
self.backend_status = d['backend_status']
self.backend_host = d['backend_host']
self.ip_address = d['ip_address']
if self.ip_address is not None and ('/32' in self.ip_address or '/128' in self.ip_address):
......@@ -176,6 +177,7 @@ class Service:
'backend_id': self.backend_id,
'ip_address': self.ip_address,
'backend_status': self.backend_status,
'backend_host': self.backend_host,
'essential': self.essential,
'proxy_address': self.proxy_address
}
......@@ -190,10 +192,11 @@ class Service:
def set_inactive(self):
"""The service is not running."""
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, backend_id=None, ip_address=None)
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, backend_id=None, ip_address=None, backend_host=None)
self.status = self.INACTIVE_STATUS
for port in self.ports:
port.reset()
self.backend_host = None
def set_starting(self):
"""The service is being created by Docker."""
......@@ -225,6 +228,12 @@ class Service:
log.debug("service {}, backend status updated to {}".format(self.id, new_status))
self.backend_status = new_status
def assign_backend_host(self, backend_host):
"""Assign this service to a host in particular."""
self.sql_manager.service_update(self.id, backend_host=backend_host)
log.debug('service {} assigned to host {}'.format(self.id, backend_host))
self.backend_host = backend_host
@property
def dns_name(self):
"""Getter for the DNS name of this service as it will be registered in Docker's DNS."""
......
......@@ -194,7 +194,7 @@ class SQLManager:
"""Adds a new service to the state."""
cur = self._cursor()
status = 'created'
query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description, essential) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential))
query = cur.mogrify('INSERT INTO service (id, status, execution_id, name, service_group, description, essential) VALUES (DEFAULT,%s,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
......
This diff is collapsed.
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
import logging
from zoe_lib.state import Service
from zoe_lib.config import get_conf
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.stats import ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__)
# These two module-level variables hold the references to the monitor and checker threads
_checker = None
class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
def __init__(self, opts):
super().__init__(opts)
self.docker_config = DockerConfig().read_config()
def _get_config(self, host) -> DockerHostConfig:
for conf in self.docker_config:
if conf.name == host:
return conf
@classmethod
def init(cls, state):
"""Initializes Swarm backend starting the event monitoring thread."""
global _checker
_checker = DockerStateSynchronizer(state)
@classmethod
def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend."""
_checker.quit()
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
conf = self._get_config(service_instance.backend_host)
try:
engine = DockerClient(conf)
cont_info = engine.spawn_container(service_instance)
except ZoeNotEnoughResourcesException:
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e))
return cont_info["id"], cont_info['ip_address'][get_conf().overlay_network_name]
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
conf = self._get_config(service.backend_host)
engine = DockerClient(conf)
engine.terminate_container(service.backend_id, delete=True)
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
return _checker.get_platform_stats()
def service_log(self, service: Service):
"""Get the log."""
conf = self._get_config(service.backend_host)
engine = DockerClient(conf)
return engine.logs(service.backend_id, True, False)
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parses Docker-specific configuration file."""
import configparser
from typing import List
import logging
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class DockerHostConfig:
"""A class that holds static information about a host."""
def __init__(self):
self.name = None
self.address = None
self.tls = False
self.tls_cert = None
self.tls_key = None
self.tls_ca = None
self.labels = []
class DockerConfig:
"""A class that holds the configuration for the Docker Engine backend."""
def __init__(self):
self.conffile = get_conf().backend_docker_config_file
def read_config(self) -> List[DockerHostConfig]:
"""Parse the configuration file."""
config = configparser.ConfigParser()
config.read(self.conffile)
hosts = []
for section in config.sections():
host = DockerHostConfig()
host.name = section
try:
host.address = config[section]['address']
host.tls = config.getboolean(section, 'use_tls')
if host.tls:
host.tls_cert = config[section]['tls_cert']
host.tls_ca = config[section]['tls_ca']
host.tls_key = config[section]['tls_key']
host.labels = config[section]['labels'].split(',')
except KeyError as e:
log.error('Error in Docker backend configuration, missing key {} in section {}'.format(e.args[0], section))
continue
hosts.append(host)
if len(hosts) == 0:
log.error('Host list is empty, verify your docker backend configuration!')
return hosts
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Monitor for the Swarm event stream."""
import logging
import threading
import time
from copy import deepcopy
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException
from zoe_master.stats import ClusterStats, NodeStats
log = logging.getLogger(__name__)
CHECK_INTERVAL = 10
THREAD_POOL_SIZE = 10
class DockerStateSynchronizer(threading.Thread):
"""The Docker Checker."""
def __init__(self, state: SQLManager) -> None:
super().__init__()
self.setName('checker')
self.stop = False
self.my_stop = False
self.state = state
self.setDaemon(True)
self._platform_stats = ClusterStats()
self.host_checkers = []
for docker_host in DockerConfig().read_config():
th = threading.Thread(target=self._host_subthread, args=(docker_host,), name='synchro_' + docker_host.name, daemon=True)
th.start()
self.host_checkers.append((th, docker_host))
self.start()
def _host_subthread(self, host_config: DockerHostConfig):
log.info("Checker thread started")
node_stats = None
for node in self._platform_stats.nodes:
if node.name == host_config.name:
node_stats = node
break
if node_stats is None:
node_stats = NodeStats(host_config.name)
self._platform_stats.nodes.append(node_stats)
while not self.stop:
try:
my_engine = DockerClient(host_config)
except ZoeException as e:
log.error(str(e))
node_stats.status = 'offline'
time.sleep(CHECK_INTERVAL)
continue
service_list = self.state.service_list(backend_host=host_config.name)
try:
container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
except ZoeException:
continue
containers = {}
for cont in container_list:
containers[cont['id']] = cont
services = {}
for service in service_list:
services[service.backend_id] = service
for service in service_list:
assert isinstance(service, Service)
if service.backend_id in containers:
self._update_service_status(service, containers[service.backend_id])
else:
if service.backend_status == service.BACKEND_DESTROY_STATUS:
continue
else:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
self._update_node_stats(my_engine, node_stats)
time.sleep(CHECK_INTERVAL)
def _update_node_stats(self, my_engine, node_stats):
try:
container_list = my_engine.list()
info = my_engine.info()
except ZoeException:
return
node_stats.container_count = info['Containers']
node_stats.cores_total = info['NCPU']
node_stats.memory_total = info['MemTotal']
stats = {}
for cont in container_list:
stats[cont['id']] = my_engine.stats(cont['id'], stream=False)
node_stats.memory_reserved = sum([stat['memory_stats']['limit'] for stat in stats.values() if 'limit' in stat['memory_stats'] and stat['memory_stats']['limit'] != node_stats.memory_total])
memory_in_use = sum([stat['memory_stats']['usage'] for stat in stats.values() if 'usage' in stat['memory_stats']])
node_stats.memory_free = node_stats.memory_total - memory_in_use
def _update_service_status(self, service: Service, container):
"""Update the service status."""
if service.backend_status != container['state']:
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
for port in service.ports:
if port.internal_name in container['ports'] and container['ports'][port.internal_name] is not None:
port.activate(container['ports'][port.internal_name][0], container['ports'][port.internal_name][1])
else:
port.reset()
def run(self):
"""The thread loop."""
log.info("Checker thread started")
while not self.my_stop:
to_remove = []
to_add = []
for th, conf in self.host_checkers:
if not th.is_alive():
log.warning('Thread {} has died, starting a new one.'.format(th.name))
to_remove.append((th, conf))
th = threading.Thread(target=self._host_subthread, args=(conf,), name='synchro_' + conf.name, daemon=True)
th.start()
to_add.append((th, conf))
for dead_th in to_remove:
self.host_checkers.remove(dead_th)
for new_th in to_add:
self.host_checkers.append(new_th)
time.sleep(CHECK_INTERVAL)
def quit(self):
"""Stops the thread."""
self.stop = True
for th, conf in self.host_checkers:
th.join()
self.my_stop = True
def get_platform_stats(self):
"""Returns a copy of the platform stats."""
return deepcopy(self._platform_stats)
......@@ -36,6 +36,11 @@ try:
except ImportError:
KubernetesBackend = None
try:
from zoe_master.backends.docker.backend import DockerEngineBackend
except ImportError:
DockerEngineBackend = None
log = logging.getLogger(__name__)
......@@ -50,6 +55,10 @@ def _get_backend() -> BaseBackend:
if SwarmBackend is None:
raise ZoeException('The Swarm backend requires docker python version >= 2.0.2')
return SwarmBackend(get_conf())
elif backend_name == 'DockerEngine':
if DockerEngineBackend is None:
raise ZoeException('The Docker Engine backend requires docker python version >= 2.0.2')
return DockerEngineBackend(get_conf())
else:
log.error('Unknown backend selected')
assert False
......@@ -67,7 +76,7 @@ def shutdown_backend():
backend.shutdown()
def service_list_to_containers(execution: Execution, service_list: List[Service]) -> str:
def service_list_to_containers(execution: Execution, service_list: List[Service], placement=None) -> str:
"""Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
backend = _get_backend()
......@@ -85,6 +94,8 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
for service in ordered_service_list:
env_subst_dict['dns_name#self'] = service.dns_name
if placement is not None:
service.assign_backend_host(placement[service.id])
service.set_starting()
instance = ServiceInstance(execution, service, env_subst_dict)
try:
......@@ -127,18 +138,18 @@ def start_all(execution: Execution) -> str:
return service_list_to_containers(execution, execution.services)
def start_essential(execution) -> str:
def start_essential(execution: Execution, placement) -> str:
"""Start the essential services for this execution"""
log.debug('starting essential services for execution {}'.format(execution.id))
execution.set_starting()
return service_list_to_containers(execution, execution.essential_services)
return service_list_to_containers(execution, execution.essential_services, placement)
def start_elastic(execution) -> str:
def start_elastic(execution: Execution, placement) -> str:
"""Start the runnable elastic services"""
elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
return service_list_to_containers(execution, elastic_to_start)
return service_list_to_containers(execution, elastic_to_start, placement)
def terminate_execution(execution: Execution) -> None:
......
......@@ -29,6 +29,7 @@ class ServiceInstance:
def __init__(self, execution: Execution, service: Service, env_subst_dict):
self.name = service.unique_name
self.hostname = service.dns_name
self.backend_host = service.backend_host
if service.resource_reservation.memory.min is None:
self.memory_limit = None
......
......@@ -229,12 +229,13 @@ class ZoeElasticScheduler:
break
free_resources = current_free_resources
log.debug('Allocation after simulation: {}'.format(cluster_status_snapshot.get_service_allocation()))
placements = cluster_status_snapshot.get_service_allocation()
log.debug('Allocation after simulation: {}'.format(placements))
# We port the results of the simulation into the real cluster
for job in jobs_to_launch: # type: Execution
if not job.essential_services_running:
ret = start_essential(job)
ret = start_essential(job, placements)
if ret == "fatal":
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
......@@ -245,7 +246,7 @@ class ZoeElasticScheduler:
job.set_running()
assert ret == "ok"
start_elastic(job)
start_elastic(job, placements)
if job.all_services_active:
log.debug('execution {}: all services are active'.format(job.id))
......
......@@ -65,7 +65,8 @@ class SimulatedPlatform:
def __init__(self, plastform_status: ClusterStats):
self.nodes = {}
for node in plastform_status.nodes:
self.nodes[node.name] = SimulatedNode(node)
if node.status == 'online':
self.nodes[node.name] = SimulatedNode(node)
def allocate_essential(self, execution: Execution) -> bool:
"""Try to find an allocation for essential services"""
......
......@@ -64,9 +64,6 @@ class ClusterStats(Stats):
"""Stats related to the whole cluster."""
def __init__(self):
super().__init__()
self.container_count = 0
self.memory_total = 0
self.cores_total = 0
self.nodes = []
def serialize(self):
......@@ -77,3 +74,18 @@ class ClusterStats(Stats):
'cores_total': self.cores_total,
'nodes': [x.serialize() for x in self.nodes]
}
@property
def memory_total(self) -> int:
"""Total memory installed in the whole platform."""
return sum([node.memory_total for node in self.nodes])
@property
def cores_total(self) -> int:
"""Total number of cores installed."""
return sum([node.cores_total for node in self.nodes])
@property
def container_count(self) -> int:
"""Total number of containers."""
return sum([node.container_count for node in self.nodes])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment