Commit aa4625e4 authored by Daniele Venzano's avatar Daniele Venzano

Merge remote-tracking branch 'origin/master'

parents 8b93ca0e a4fa73cb
......@@ -11,7 +11,7 @@ variables:
POSTGRES_USER: zoeuser
POSTGRES_PASSWORD: zoepass
ZOE_TEST_IMAGE: zoe-test:$CI_PIPELINE_ID
ZOE_COMMON_OPTIONS: --debug --backend-swarm-url ${SWARM_URL} --deployment-name test${CI_BUILD_REF} --dbuser ${POSTGRES_USER} --dbhost postgres --dbport 5432 --dbname ${POSTGRES_DB} --dbpass ${POSTGRES_PASSWORD} --master-url tcp://localhost:4850 --auth-type text --listen-port 5100 --workspace-base-path /tmp
ZOE_COMMON_OPTIONS: --debug --deployment-name test${CI_BUILD_REF} --dbuser ${POSTGRES_USER} --dbhost postgres --dbport 5432 --dbname ${POSTGRES_DB} --dbpass ${POSTGRES_PASSWORD} --master-url tcp://localhost:4850 --auth-type text --listen-port 5100 --workspace-base-path /tmp
cache:
paths:
......
......@@ -23,7 +23,6 @@ At this time Zoe supports three back-ends:
* DockerEngine: uses one or more Docker Engines. It is simple to install and to scale.
* Kubernetes: the most complex to setup, we suggest using it only if you already have (or need) a Kubernetes setup for running other software.
* Legacy Docker Swarm (deprecated): simple to install, additional features like SSL, high-availability and dynamic host discovery can be added as needed. Please note that Zoe does not support the new Swarm Mode of Docker Engine as the API is too limited.
DockerEngine
^^^^^^^^^^^^
......@@ -309,7 +308,7 @@ A Docker Registry becomes interesting to have if you have lot of image build act
Zoe
^^^
Zoe is written in Python and uses the ``requirements.txt`` file to list the package dependencies needed for all components of Zoe. Not all of them are needed in all cases, for example you need the ``kazoo`` library only if you use Zookeeper to manage Swarm high availability.
Zoe is written in Python and uses the ``requirements.txt`` file to list the package dependencies needed for all components of Zoe. Not all of them are needed in all cases, for example you need the ``pykube`` library only if you use the Kubernetes back-end.
Currently this is the recommended procedure, once the initial Swarm setup has been done:
......
......@@ -2,7 +2,6 @@ Jinja2>=2.8
requests>=2.9.1
docker>=2.1.0
tornado>=4.3
kazoo>=2.2.1
humanfriendly
psycopg2>=2.6.1
pyzmq>=15.2.0
......
#!/usr/bin/python3
"""
Find the Swarm manager by querying ZooKeeper.
"""
import sys
from kazoo.client import KazooClient
def zookeeper_swarm(zk_server_list, path='/swarm'):
"""Query ZooKeeper."""
path += '/docker/swarm/leader'
zk = KazooClient(hosts=zk_server_list)
zk.start()
master, stat_ = zk.get(path)
zk.stop()
return master.decode('utf-8')
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Provide zookeeper server list")
print(zookeeper_swarm(sys.argv[1]))
......@@ -17,6 +17,7 @@
import logging
import os
from typing import Mapping
import zoe_api.exceptions
import zoe_api.master_api
......@@ -51,11 +52,19 @@ class APIEndpoint:
raise zoe_api.exceptions.ZoeAuthException()
return e
def execution_list(self, uid, role, **filters):
def execution_list(self, uid: str, role: str, **filters: Mapping[str, str]):
"""Generate a optionally filtered list of executions."""
if role != 'admin':
filters['user_id'] = uid
execs = self.sql.executions.select(**filters)
ret = [e for e in execs if e.user_id == uid or role == 'admin']
return ret
return execs
def execution_count(self, uid: str, role: str, **filters: Mapping[str, str]):
"""Count the number of executions optionally filtered."""
if role != 'admin':
filters['user_id'] = uid
execs = self.sql.executions.count(**filters)
return execs
def zapp_validate(self, application_description):
"""Validates the passed ZApp description against the supported schema."""
......
......@@ -41,6 +41,7 @@ def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(r'/logout', zoe_api.web.start.LogoutWeb, route_args, name='logout'),
tornado.web.url(r'/executions', zoe_api.web.executions.ExecutionListWeb, route_args, name='execution_list'),
tornado.web.url(r'/executions/([0-9]+)', zoe_api.web.executions.ExecutionListWeb, route_args, name='execution_list_page'),
tornado.web.url(r'/executions/start', zoe_api.web.executions.ExecutionStartWeb, route_args, name='execution_start'),
tornado.web.url(r'/executions/restart/([0-9]+)', zoe_api.web.executions.ExecutionRestartWeb, route_args, name='execution_restart'),
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, route_args, name='execution_terminate'),
......
......@@ -17,6 +17,7 @@
import datetime
import json
import math
import time
from zoe_lib.config import get_conf
......@@ -53,25 +54,32 @@ class ExecutionStartWeb(ZoeRequestHandler):
class ExecutionListWeb(ZoeRequestHandler):
"""Handler class"""
PAGINATION_ITEM_COUNT = 50
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
@catch_exceptions
def get(self):
def get(self, page=0):
"""Home page with authentication."""
uid, role = get_auth(self)
if uid is None:
self.redirect(self.get_argument('next', u'/login'))
return
executions = self.api_endpoint.execution_list(uid, role)
page = int(page)
executions_count = self.api_endpoint.execution_count(uid, role)
executions = self.api_endpoint.execution_list(uid, role, base=page*self.PAGINATION_ITEM_COUNT, limit=self.PAGINATION_ITEM_COUNT)
template_vars = {
"uid": uid,
"role": role,
'executions': sorted(executions, key=lambda e: e.id, reverse=True)
'executions': sorted(executions, key=lambda e: e.id, reverse=True),
'current_page': page,
'max_page': math.ceil(executions_count / self.PAGINATION_ITEM_COUNT),
'last_page': len(executions) < self.PAGINATION_ITEM_COUNT
}
self.render('execution_list.html', **template_vars)
......
......@@ -17,6 +17,17 @@
{% block content %}
<div id="my_executions">
<label class="filter">All executions <input class="filter" placeholder="Filter" /></label>
{% if max_page > 0 %}
<p>Pages:
{% for page_n in range(0, max_page) %}
{% if page_n == current_page %}
{{ page_n + 1 }}&nbsp;
{% else %}
<a href="/executions/{{ page_n }}">{{ page_n + 1 }}</a>&nbsp;
{% endif %}
{% endfor %}
</p>
{% endif %}
<table id="exec_list" class="app_list sortable">
<thead>
<tr>
......@@ -61,6 +72,17 @@
{% endfor %}
</tbody>
</table>
{% if max_page > 0 %}
<p>Pages:
{% for page_n in range(0, max_page) %}
{% if page_n == current_page %}
{{ page_n + 1 }}&nbsp;
{% else %}
<a href="/executions/{{ page_n }}">{{ page_n + 1 }}</a>&nbsp;
{% endif %}
{% endfor %}
</p>
{% endif %}
</div>
<script>
......
......@@ -3,6 +3,13 @@
{% block custom_head %}
<script src="/static/Chart.min.js" type="application/javascript"></script>
<script>
Chart.scaleService.updateScaleDefaults('linear', {
ticks: {
min: 0
}
});
</script>
{% endblock %}
{% block content %}
......
......@@ -19,7 +19,6 @@ import logging
from zoe_lib.configargparse import ArgumentParser, Namespace
logging.getLogger('kazoo').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('docker').setLevel(logging.INFO)
......@@ -99,16 +98,10 @@ def load_configuration(test_conf=None):
# Scheduler
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE', 'DYNSIZE'], default='FIFO')
argparser.add_argument('--placement-policy', help='Placement policy', choices=['waterfill', 'random', 'average'], default='average')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--backend-swarm-zk-path', help='Swarm/Docker optional ZooKeeper path for Swarm Znodes', default='/docker')
argparser.add_argument('--backend-swarm-tls-cert', help='Docker TLS certificate file', default='cert.pem')
argparser.add_argument('--backend-swarm-tls-key', help='Docker TLS private key file', default='key.pem')
argparser.add_argument('--backend-swarm-tls-ca', help='Docker TLS CA certificate file', default='ca.pem')
argparser.add_argument('--backend', choices=['Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
# Docker Engine backend options
argparser.add_argument('--backend-docker-config-file', help='Location of the Docker Engine config file', default='docker.conf')
......
......@@ -67,10 +67,13 @@ class Execution(BaseRecord):
self._status = d['status']
self.error_message = d['error_message']
try:
self.size = self.description['size']
except KeyError:
self.size = self.description['priority'] # zapp format v2
if d['size'] is not None:
self.size = float(d['size'])
else:
try:
self.size = self.description['size']
except KeyError:
self.size = self.description['priority'] # zapp format v2
self.termination_lock = threading.Lock()
......@@ -86,7 +89,8 @@ class Execution(BaseRecord):
'time_end': None if self.time_end is None else (self.time_end - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
'status': self._status,
'error_message': self.error_message,
'services': [s.id for s in self.services]
'services': [s.id for s in self.services],
'size': self.size
}
def __eq__(self, other):
......@@ -130,6 +134,11 @@ class Execution(BaseRecord):
self.error_message = message
self.sql_manager.executions.update(self.id, error_message=self.error_message)
def set_size(self, new_size):
"""Changes the size of the execution, for policies that calculate the size automatically."""
self.size = new_size
self.sql_manager.executions.update(self.id, size=new_size)
@property
def is_active(self):
"""
......@@ -225,6 +234,7 @@ class ExecutionTable(BaseTable):
user_id TEXT NOT NULL,
description JSON NOT NULL,
status TEXT NOT NULL,
size NUMERIC NOT NULL,
execution_manager_id TEXT NULL,
time_submit TIMESTAMP NOT NULL,
time_start TIMESTAMP NULL,
......@@ -236,12 +246,12 @@ class ExecutionTable(BaseTable):
"""Create a new execution in the state."""
status = Execution.SUBMIT_STATUS
time_submit = datetime.datetime.utcnow()
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, size, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, description['size'], time_submit))
self.cursor.execute(query)
self.sql_manager.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
def select(self, only_one=False, limit=-1, base=0, **kwargs):
"""
Return a list of executions.
......@@ -249,6 +259,8 @@ class ExecutionTable(BaseTable):
:type only_one: bool
:param limit: limit the result to this number of entries
:type limit: int
:type base: int
:param base: the base value to use when limiting result count
:param kwargs: filter executions based on their fields/columns
:return: one or more executions
"""
......@@ -275,11 +287,11 @@ class ExecutionTable(BaseTable):
args_list.append(value)
q += ' AND '.join(filter_list)
if limit > 0:
q += ' ORDER BY id DESC LIMIT {}'.format(limit)
q += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
query = self.cursor.mogrify(q, args_list)
else:
if limit > 0:
q_base += ' ORDER BY id DESC LIMIT {}'.format(limit)
q_base += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
query = self.cursor.mogrify(q_base)
self.cursor.execute(query)
......@@ -290,3 +302,40 @@ class ExecutionTable(BaseTable):
return Execution(row, self.sql_manager)
else:
return [Execution(x, self.sql_manager) for x in self.cursor]
def count(self, **kwargs):
"""
Return a list of executions.
:param kwargs: filter executions based on their fields/columns
:return: one or more executions
"""
q_base = 'SELECT COUNT(*) FROM execution'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
if key == 'earlier_than_submit':
filter_list.append('"time_submit" <= to_timestamp(%s)')
elif key == 'earlier_than_start':
filter_list.append('"time_start" <= to_timestamp(%s)')
elif key == 'earlier_than_end':
filter_list.append('"time_end" <= to_timestamp(%s)')
elif key == 'later_than_submit':
filter_list.append('"time_submit" >= to_timestamp(%s)')
elif key == 'later_than_start':
filter_list.append('"time_start" >= to_timestamp(%s)')
elif key == 'later_than_end':
filter_list.append('"time_end" >= to_timestamp(%s)')
else:
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
query = self.cursor.mogrify(q, args_list)
else:
query = self.cursor.mogrify(q_base)
self.cursor.execute(query)
row = self.cursor.fetchone()
return row[0]
......@@ -210,12 +210,15 @@ class DockerClient:
except docker.errors.NotFound:
return
cont.stop(timeout=5)
if delete:
try:
try:
if delete:
cont.remove(force=True)
except docker.errors.APIError as e:
log.warning(str(e))
else:
cont.stop(timeout=5)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
log.warning(str(e))
def event_listener(self, callback: Callable[[str], bool]) -> None:
"""An infinite loop that listens for events from Swarm."""
......
......@@ -83,7 +83,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
if service.backend_id is not None:
engine.terminate_container(service.backend_id, delete=True)
else:
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
log.error('Cannot terminate service {}, since it has no backend ID'.format(service.name))
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def platform_state(self) -> ClusterStats:
......@@ -193,34 +193,8 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
def list_available_images(self, node_name):
"""List the images available on the specified node."""
host_conf = None
for conf in self.docker_config:
if conf.name == node_name:
host_conf = conf
break
if host_conf is None:
log.error('Unknown node {}, returning empty image list'.format(node_name))
return []
try:
my_engine = DockerClient(host_conf)
except ZoeException as e:
log.error(str(e))
return []
image_list = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags # type: list
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
image_list.append(image)
return image_list
node_stats = _checker.host_stats[node_name]
return node_stats.images
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
......@@ -239,4 +213,4 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
cpu_quota = int(cores * 100000)
engine.update(service.backend_id, cpu_quota=cpu_quota, mem_reservation=memory)
else:
log.error('Cannot update service {} ({}), since it has no backend ID'.format(service.name, service.id))
log.error('Cannot update reservations for service {} ({}), since it has no backend ID'.format(service.name, service.id))
......@@ -100,6 +100,19 @@ class DockerStateSynchronizer(threading.Thread):
}
self.host_stats[host_config.name].service_stats = stats
self.host_stats[host_config.name].images = []
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags # type: list
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
image['names'].append(name[:-7])
break
self.host_stats[host_config.name].images.append(image)
sleep_time = CHECK_INTERVAL - (time.time() - time_start)
if sleep_time <= 0:
log.warning('synchro thread for host {} is late by {:.2f} seconds'.format(host_config.name, sleep_time * -1))
......
......@@ -27,11 +27,6 @@ from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
from zoe_master.stats import ClusterStats # pylint: disable=unused-import
try:
from zoe_master.backends.swarm.backend import SwarmBackend
except ImportError as ex:
SwarmBackend = None
try:
from zoe_master.backends.kubernetes.backend import KubernetesBackend
except ImportError:
......@@ -53,10 +48,6 @@ def _get_backend() -> Union[BaseBackend, None]:
if KubernetesBackend is None:
raise ZoeException('The Kubernetes backend requires the pykube module')
return KubernetesBackend(get_conf())
elif backend_name == 'Swarm':
if SwarmBackend is None:
raise ZoeException('The Swarm backend requires docker python version >= 2.0.2')
return SwarmBackend(get_conf())
elif backend_name == 'DockerEngine':
if DockerEngineBackend is None:
raise ZoeException('The Docker Engine backend requires docker python version >= 2.0.2')
......
This diff is collapsed.
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe back-end implementation for old-style stand-alone Docker Swarm."""
import logging
from zoe_lib.state import Service
from zoe_master.backends.swarm.api_client import SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.swarm.threads import SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__)
# These two module-level variables hold the references to the monitor and checker threads
_checker = None
class SwarmBackend(zoe_master.backends.base.BaseBackend):
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
def __init__(self, opts):
super().__init__(opts)
self.swarm = SwarmClient()
log.warning('The Docker Swarm back-end is deprecated and will be removed in 2018.03, please upgrade to the Docker Engine back-end.')
@classmethod
def init(cls, state):
"""Initializes Swarm backend starting the event monitoring thread."""
global _checker
_checker = SwarmStateSynchronizer(state)
@classmethod
def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend."""
_checker.quit()
def spawn_service(self, service_instance: ServiceInstance):
"""Spawn a service, translating a Zoe Service into a Docker container."""
try:
cont_info = self.swarm.spawn_container(service_instance)
except ZoeNotEnoughResourcesException:
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e))
ip_address = None
ports = {}
for port_name, mapping in cont_info['ports'].items():
if mapping is None:
continue
ip_address = mapping[0]
ports[port_name] = mapping[1]
return cont_info["id"], ip_address, ports
def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container."""
self.swarm.terminate_container(service.backend_id, delete=True)
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
info = self.swarm.info()
for node in info.nodes: # type: NodeStats
node.memory_in_use = node.memory_reserved
node.cores_in_use = node.cores_reserved
return info
def service_log(self, service: Service):
"""Get the log."""
return self.swarm.logs(service.backend_id, True, False)
def preload_image(self, image_name: str) -> None:
"""Make a service image available."""
raise NotImplementedError
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
log.error('Reservation update not implemented in the Swarm back-end')
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Monitor for the Swarm event stream."""
import logging
import threading
import time
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.swarm.api_client import SwarmClient
from zoe_master.exceptions import ZoeException
log = logging.getLogger(__name__)
CHECK_INTERVAL = 10
class SwarmStateSynchronizer(threading.Thread):
"""The Swarm Checker."""
def __init__(self, state: SQLManager) -> None:
super().__init__()
self.setName('checker')
self.stop = False
self.state = state
self.setDaemon(True)
self.start()
def _update_service_status(self, service: Service, container):
"""Update the service status."""
if service.backend_status != container['state']:
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
def run(self):
"""The thread loop."""
log.info("Checker thread started")
while not self.stop:
try:
swarm = SwarmClient()
except ZoeException as e:
log.error(str(e))
time.sleep(CHECK_INTERVAL)
continue
service_list = self.state.services.select()
try:
container_list = swarm.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
except ZoeException:
continue