Commit e5e6971c authored by Daniele Venzano's avatar Daniele Venzano

Backend abstraction and new docker api backend

parent 91c913ad
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 1 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 3 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......@@ -73,8 +73,10 @@ def create_tables(cur):
execution_id INT REFERENCES execution,
service_group TEXT NOT NULL,
name TEXT NOT NULL,
docker_id TEXT NULL DEFAULT NULL,
docker_status TEXT NOT NULL DEFAULT 'undefined'
backend_id TEXT NULL DEFAULT NULL,
backend_status TEXT NOT NULL DEFAULT 'undefined',
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
......
......@@ -157,7 +157,7 @@ def exec_get_cmd(args):
service = cont_api.get(c_id)
print('Service {} (ID: {})'.format(service['name'], service['id']))
print(' - zoe status: {}'.format(service['status']))
print(' - docker status: {}'.format(service['docker_status']))
print(' - backend status: {}'.format(service['docker_status']))
if service['error_message'] is not None:
print(' - error: {}'.format(service['error_message']))
if service['docker_status'] == 'started':
......
......@@ -49,7 +49,6 @@ def load_configuration(test_conf=None):
# Common options
argparser.add_argument('--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--swarm', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--deployment-name', help='name of this Zoe deployment', default='prod')
argparser.add_argument('--dbname', help='DB name', default='zoe')
......@@ -87,11 +86,19 @@ def load_configuration(test_conf=None):
argparser.add_argument('--service-log-path', help='Save service logs in this directory, EXPERIMENTAL', default='')
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--docker-tls-cert', help='Docker TLS certificate file', default='cert.pem')
argparser.add_argument('--docker-tls-key', help='Docker TLS private key file', default='key.pem')
argparser.add_argument('--docker-tls-ca', help='Docker TLS CA certificate file', default='ca.pem')
# Docker Swarm backend options
argparser.add_argument('--backend', choices=['OldSwarm', 'OldSwarmNewAPI'], default='OldSwarmNewAPI')
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--backend-swarm-zk-path', help='Swarm/Docker optional ZooKeeper path for Swarm Znodes', default='/docker')
argparser.add_argument('--cookie-secret', help='secret used to encrypt cookies', default='changeme')
opts = argparser.parse_args()
if opts.debug:
argparser.print_values()
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to PostgresQL for Zoe state."""
import datetime
import logging
import psycopg2
import psycopg2.extras
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
class SQLManager:
"""The SQLManager class, should be used as a singleton."""
def __init__(self, conf):
self.user = conf.dbuser
self.password = conf.dbpass
self.host = conf.dbhost
self.port = conf.dbport
self.dbname = conf.dbname
self.schema = conf.deployment_name
self.conn = None
self._connect()
def _connect(self):
dsn = 'dbname=' + self.dbname + \
' user=' + self.user + \
' password=' + self.password + \
' host=' + self.host + \
' port=' + str(self.port)
self.conn = psycopg2.connect(dsn)
def _cursor(self):
try:
cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
except psycopg2.InterfaceError:
self._connect()
cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('SET search_path TO {},public'.format(self.schema))
return cur
def execution_list(self, only_one=False, **kwargs):
"""
Return a list of executions.
:param only_one: only one result is expected
:type only_one: bool
:param kwargs: filter executions based on their fields/columns
:return: one or more executions
"""
cur = self._cursor()
q_base = 'SELECT * FROM execution'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
query = cur.mogrify(q, args_list)
else:
query = cur.mogrify(q_base)
cur.execute(query)
if only_one:
row = cur.fetchone()
if row is None:
return None
return Execution(row, self)
else:
return [Execution(x, self) for x in cur]
def execution_update(self, exec_id, **kwargs):
"""Update the state of an execution."""
cur = self._cursor()
arg_list = []
value_list = []
for key, value in kwargs.items():
arg_list.append('{} = %s'.format(key))
value_list.append(value)
set_q = ", ".join(arg_list)
value_list.append(exec_id)
q_base = 'UPDATE execution SET ' + set_q + ' WHERE id=%s'
query = cur.mogrify(q_base, value_list)
cur.execute(query)
self.conn.commit()
def execution_new(self, name, user_id, description):
"""Create a new execution in the state."""
cur = self._cursor()
status = Execution.SUBMIT_STATUS
time_submit = datetime.datetime.now()
query = cur.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
def execution_delete(self, execution_id):
"""Delete an execution and its services from the state."""
cur = self._cursor()
query = "DELETE FROM service WHERE execution_id = %s"
cur.execute(query, (execution_id,))
query = "DELETE FROM execution WHERE id = %s"
cur.execute(query, (execution_id,))
self.conn.commit()
def service_list(self, only_one=False, **kwargs):
"""
Return a list of services.
:param only_one: only one result is expected
:type only_one: bool
:param kwargs: filter services based on their fields/columns
:return: one or more services
"""
cur = self._cursor()
q_base = 'SELECT * FROM service'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
query = cur.mogrify(q, args_list)
else:
query = cur.mogrify(q_base)
cur.execute(query)
if only_one:
row = cur.fetchone()
if row is None:
return None
return Service(row, self)
else:
return [Service(x, self) for x in cur]
def service_update(self, service_id, **kwargs):
"""Update the state of an existing service."""
cur = self._cursor()
arg_list = []
value_list = []
for key, value in kwargs.items():
arg_list.append('{} = %s'.format(key))
value_list.append(value)
set_q = ", ".join(arg_list)
value_list.append(service_id)
q_base = 'UPDATE service SET ' + set_q + ' WHERE id=%s'
query = cur.mogrify(q_base, value_list)
cur.execute(query)
self.conn.commit()
def service_new(self, execution_id, name, service_group, description):
"""Adds a new service to the state."""
cur = self._cursor()
status = 'created'
query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
class Base:
"""
:type sql_manager: SQLManager
"""
def __init__(self, d, sql_manager):
"""
:type sql_manager: SQLManager
"""
self.sql_manager = sql_manager
self.id = d['id']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
raise NotImplementedError
class Execution(Base):
"""
A Zoe execution.
:type time_submit: datetime.datetime
:type time_start: datetime.datetime
:type time_end: datetime.datetime
"""
SUBMIT_STATUS = "submitted"
SCHEDULED_STATUS = "scheduled"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
RUNNING_STATUS = "running"
CLEANING_UP_STATUS = "cleaning up"
TERMINATED_STATUS = "terminated"
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
self.user_id = d['user_id']
self.name = d['name']
self.description = d['description']
if isinstance(d['time_submit'], datetime.datetime):
self.time_submit = d['time_submit']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_submit'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_start = d['time_start']
else:
self.time_start = datetime.datetime.fromtimestamp(d['time_start'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_end = d['time_end']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_start'])
self._status = d['status']
self.error_message = d['error_message']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'user_id': self.user_id,
'name': self.name,
'description': self.description,
'time_submit': self.time_submit.timestamp(),
'time_start': None if self.time_start is None else self.time_start.timestamp(),
'time_end': None if self.time_end is None else self.time_end.timestamp(),
'status': self._status,
'error_message': self.error_message,
'services': [s.id for s in self.services]
}
def __eq__(self, other):
return self.id == other.id
def set_scheduled(self):
"""The execution has been added to the scheduler queues."""
self._status = self.SCHEDULED_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_starting(self):
"""The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_running(self):
"""The execution is running and producing useful work."""
self._status = self.RUNNING_STATUS
self.time_start = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
def set_cleaning_up(self):
"""The services of the execution are being terminated."""
self._status = self.CLEANING_UP_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_terminated(self):
"""The execution is not running."""
self._status = self.TERMINATED_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error(self):
"""The scheduler encountered an error starting or running the execution."""
self._status = self.ERROR_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error_message(self, message):
"""Contains an error message in case the status is 'error'."""
self.error_message = message
self.sql_manager.execution_update(self.id, error_message=self.error_message)
def is_active(self):
"""
Returns True if the execution is in the scheduler
:return:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
@property
def status(self):
"""Getter for the execution status."""
return self._status
@property
def services(self):
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
class Service(Base):
"""A Zoe Service."""
TERMINATING_STATUS = "terminating"
INACTIVE_STATUS = "inactive"
ACTIVE_STATUS = "active"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
DOCKER_UNDEFINED_STATUS = 'undefined'
DOCKER_CREATE_STATUS = 'created'
DOCKER_START_STATUS = 'started'
DOCKER_DIE_STATUS = 'dead'
DOCKER_DESTROY_STATUS = 'destroyed'
DOCKER_OOM_STATUS = 'oom-killed'
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
self.name = d['name']
self.status = d['status']
self.error_message = d['error_message']
self.execution_id = d['execution_id']
self.description = d['description']
self.service_group = d['service_group']
self.docker_id = d['docker_id']
self.docker_status = d['docker_status']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'name': self.name,
'status': self.status,
'error_message': self.error_message,
'execution_id': self.execution_id,
'description': self.description,
'service_group': self.service_group,
'docker_id': self.docker_id,
'ip_address': self.ip_address,
'docker_status': self.docker_status
}
def __eq__(self, other):
return self.id == other.id
@property
def dns_name(self):
"""Getter for the DNS name of this service as it will be registered in Docker's DNS."""
return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)
def set_terminating(self):
"""The service is being killed."""
self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
self.status = self.TERMINATING_STATUS
def set_inactive(self):
"""The service is not running."""
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, docker_id=None)
self.status = self.INACTIVE_STATUS
def set_starting(self):
"""The service is being created by Docker."""
self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
self.status = self.STARTING_STATUS
def set_active(self, docker_id):
"""The service is running and has a valid docker_id."""
self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id, error_message=None)
self.error_message = None
self.status = self.ACTIVE_STATUS
def set_error(self, error_message):
"""The service could not be created/started."""
self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
self.status = self.ERROR_STATUS
self.error_message = error_message
def set_docker_status(self, new_status):
"""Docker has emitted an event related to this service."""
self.sql_manager.service_update(self.id, docker_status=new_status)
log.debug("service {}, status updated to {}".format(self.id, new_status))
self.docker_status = new_status
@property
def ip_address(self):
"""Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
if self.docker_status != self.DOCKER_START_STATUS:
return {}
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(self.docker_id)
return s_info['ip_address'][get_conf().overlay_network_name]
@property
def user_id(self):
"""Getter for the user_id, that is actually taken form the parent execution."""
execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
return execution.user_id
......@@ -173,6 +173,7 @@ class Service:
execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
return execution.user_id
@property
def is_dead(self):
"""Returns True if this service is not running."""
return self.backend_status == self.BACKEND_DESTROY_STATUS or self.backend_status == self.BACKEND_OOM_STATUS or self.backend_status == self.BACKEND_DIE_STATUS
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The base class that all backends should implement."""
from zoe_lib.state import Service
from zoe_master.stats import ClusterStats
from zoe_master.backends.service_instance import ServiceInstance
class BaseBackend:
"""The base class that all backends should implement."""
def __init__(self, conf):
pass
def init(self, state):
"""Initializes the backend. In general this includes finding the current API endpoint and opening a connection to it, negotiate the API version, etc. Here backend-related threads can be started, too. This method will be called only once at Zoe startup."""
raise NotImplementedError
def shutdown(self):
"""Performs a clean shutdown of the resources used by Swarm backend. Any threads that where started in the init() method should be terminated here. This method will be called when Zoe shuts down."""
raise NotImplementedError
def spawn_service(self, service_instance: ServiceInstance):
"""Create a container for a service.
The backend translates all the configuration parameters given in the ServiceInstance object into backend-specific container options and starts the container.
This function should either:
* raise ``ZoeStartExecutionRetryException`` in case a temporary error is generated
* raise ``ZoeStartExecutionFatalException`` in case a fatal error is generated
* return a backend-specific ID that will be used later by Zoe to interact with the running container
"""
raise NotImplementedError
def terminate_service(self, service: Service) -> None:
"""Terminate the container corresponding to a service."""
raise NotImplementedError
def platform_state(self) -> ClusterStats:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The high-level interface that Zoe uses to talk to the configured container backend."""
import os
import logging
from zoe_lib.state import Service, Execution, VolumeDescription
from zoe_lib.config import get_conf
from zoe_master.workspace.filesystem import ZoeFSWorkspace
log = logging.getLogger(__name__)
def gen_environment(service: Service, execution: Execution):
"""Return the list of environment variables that needs to be added to all containers."""
fswk = ZoeFSWorkspace()
env_list = [
('ZOE_EXECUTION_ID', execution.id),
('ZOE_EXECUTION_NAME', execution.name),
('ZOE_SERVICE_GROUP', service.service_group),
('ZOE_SERVICE_NAME', service.name),
('ZOE_SERVICE_ID', service.id),
('ZOE_OWNER', execution.user_id),
('ZOE_DEPLOYMENT_NAME', get_conf().deployment_name),
('ZOE_MY_DNS_NAME', service.dns_name),
('ZOE_WORKSPACE', fswk.get_mountpoint())
]
service_list = []
for tmp_service in execution.services:
service_list.append(tmp_service.dns_name)
env_list.append(('ZOE_EXECUTION_SERVICE_LIST', ','.join(service_list)))
return env_list
def _create_logs_directories(exec_id, service_name):
path = os.path.join(get_conf().logs_base_path, get_conf().deployment_name, str(exec_id), service_name)
try:
os.makedirs(path)
except OSError as e:
log.error('Cannot create path {}: {}'.format(path, str(e)))
return None
return path
def gen_volumes(service: Service, execution: Execution):
"""Return the list of default volumes to be added to all containers."""
vol_list = []
fswk = ZoeFSWorkspace()
wk_vol = fswk.get(execution.user_id)
vol_list.append(wk_vol)
logs_path = _create_logs_directories(execution.id, service.name)
if logs_path is not None:
logs_mountpoint = '/logs'
logs_vol = VolumeDescription((logs_path, logs_mountpoint, True))
vol_list.append(logs_vol)
return vol_list
def gen_labels(service: Service, execution: Execution):
"""Generate container labels, useful for identifying containers in monitoring systems."""
return {
'zoe_execution_name': execution.name,
'zoe_execution_id': str(execution.id),
'zoe_service_name': service.name,
'zoe_service_id': str(service.id),
'zoe_owner': execution.user_id,
'zoe_deployment_name': get_conf().deployment_name,
'zoe_type': 'app_service'
}
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The high-level interface that Zoe uses to talk to the configured container backend."""
import logging
from typing import List
from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
import zoe_master.backends.old_swarm.api_client
from zoe_master.backends.old_swarm_new_api.backend import OldSwarmNewAPIBackend
import zoe_master.backends.old_swarm_new_api.api_client
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
log = logging.getLogger(__name__)
def _get_backend() -> BaseBackend:
"""Return the right backend instance by reading the global configuration."""
backend_name = get_conf().backend
if backend_name == 'OldSwarm':
if not zoe_master.backends.old_swarm.api_client.AVAILABLE:
raise ZoeException('The OldSwarm backend requires docker-py version <= 1.10.2')
return OldSwarmBackend(get_conf())
elif backend_name == 'OldSwarmNewAPI':
if not zoe_master.backends.old_swarm_new_api.api_client.AVAILABLE:
raise ZoeException('The OldSwarmNewAPI backend requires docker python version >= 2.0.2')
return OldSwarmNewAPIBackend(get_conf())
else:
log.error('Unknown backend selected')
assert False
def initialize_backend(state):
"""Initializes the configured backend."""
backend = _get_backend()
backend.init(state)
def shutdown_backend():
"""Shuts down the configured backend."""
backend = _get_backend()
backend.shutdown()
def service_list_to_containers(execution: Execution, service_list: List[Service]) -> str:
"""Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
backend = _get_backend()
ordered_service_list = sorted(service_list, key=lambda x: x.startup_order)
for service in ordered_service_list:
service.set_starting()
instance = ServiceInstance(execution, service)
try:
backend_id = backend.spawn_service(instance)
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))