Commit a0adc5ee authored by Daniele Venzano's avatar Daniele Venzano

Properly implement reserved, allocated and in-use statistics

Small usability fixes for the execution inspect page
Move all the database access in the state package
Remove image management, it is too slow to download images before while the user waits the the ZApp to start
Make several thread loops more robust in case of uncaught exceptions
parent 70034ec7
# Zoe Changelog
## Version 2017.12
* Status page for the administrator
* More information about authentications in the log output of zoe-api
## Version 2017.09
* Major web UI redesign
......
......@@ -15,6 +15,7 @@
"""Create the DB tables needed by Zoe. This script is used in the CI pipeline to prevent race conditions with zoe-api automatically creating the tables while zoe-master is starting at the same time."""
import sys
import time
import zoe_lib.config as config
......@@ -23,8 +24,13 @@ import zoe_lib.state.sql_manager
config.load_configuration()
print("Warning, this script will delete the database tables for the deployment '{}' before creating new ones".format(config.get_conf().deployment_name))
print("If you are installing Zoe for the first time, you have nothing to worry about")
print("Sleeping 5 seconds before continuing, hit CTRL-C to stop and think.")
time.sleep(5)
state = zoe_lib.state.sql_manager.SQLManager(config.get_conf())
state.init_db(force=True)
try:
time.sleep(5)
except KeyboardInterrupt:
print("Aborted.")
sys.exit(1)
zoe_lib.state.sql_manager.SQLManager(config.get_conf()).init_db(force=True)
# Copyright (c) 2017, Quang-Nhat HOANG-XUAN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Store adapters to read/write data to from/to PostgresSQL. """
import zoe_lib.state
from zoe_lib.config import get_conf
from oauth2.store import AccessTokenStore, ClientStore
from oauth2.datatype import AccessToken, Client
from oauth2.error import AccessTokenNotFound, ClientNotFoundError
class AccessTokenStorePg(AccessTokenStore):
""" AccessTokenStore for postgresql """
def fetch_by_refresh_token(self, refresh_token):
""" get accesstoken from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_by_refresh_token(refresh_token)
if data is None:
raise AccessTokenNotFound
return AccessToken(client_id=data["client_id"],
grant_type=data["grant_type"],
token=data["token"],
data=data["data"],
expires_at=data["expires_at"].timestamp(),
refresh_token=data["refresh_token"],
refresh_expires_at=data["refresh_token_expires_at"].timestamp(),
scopes=data["scopes"])
def delete_refresh_token(self, refresh_token):
"""
Deletes (invalidates) an old refresh token after use
:param refresh_token: The refresh token.
"""
sql = zoe_lib.state.SQLManager(get_conf())
res = sql.delete_refresh_token(refresh_token)
return res
def get_client_id_by_refresh_token(self, refresh_token):
""" get clientID from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_refresh_token(refresh_token)
return data
def get_client_id_by_access_token(self, access_token):
""" get clientID from accesstoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_access_token(access_token)
return data
def fetch_existing_token_of_user(self, client_id, grant_type, user_id):
""" get accesstoken from userid """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_existing_token_of_user(client_id, grant_type, user_id)
if data is None:
raise AccessTokenNotFound
return AccessToken(client_id=data["client_id"],
grant_type=data["grant_type"],
token=data["token"],
data=data["data"],
expires_at=data["expires_at"].timestamp(),
refresh_token=data["refresh_token"],
refresh_expires_at=data["refresh_token_expires_at"].timestamp(),
scopes=data["scopes"],
user_id=data["user_id"])
def save_token(self, access_token):
""" save accesstoken """
sql = zoe_lib.state.SQLManager(get_conf())
sql.save_token(access_token.client_id,
access_token.grant_type,
access_token.token,
access_token.data,
access_token.expires_at,
access_token.refresh_token,
access_token.refresh_expires_at,
access_token.scopes,
access_token.user_id)
return True
class ClientStorePg(ClientStore):
""" ClientStore for postgres """
def save_client(self, identifier, secret, role, redirect_uris, authorized_grants, authorized_response_types):
""" save client to db """
sql = zoe_lib.state.SQLManager(get_conf())
sql.save_client(identifier,
secret,
role,
redirect_uris,
authorized_grants,
authorized_response_types)
return True
def fetch_by_client_id(self, client_id):
""" get client by clientid """
sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id)
client_data_grants = client_data["authorized_grants"].split(':')
if client_data is None:
raise ClientNotFoundError
return Client(identifier=client_data["identifier"],
secret=client_data["secret"],
redirect_uris=client_data["redirect_uris"],
authorized_grants=client_data_grants,
authorized_response_types=client_data["authorized_response_types"])
def get_role_by_client_id(self, client_id):
""" get client role by clientid """
sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id)
if client_data is None:
raise ClientNotFoundError
return client_data["role"]
# Copyright (c) 2016, Quang-Nhat HOANG-XUAN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Token generator for oauth2."""
import hashlib
import os
import uuid
class TokenGenerator(object):
"""
Base class of every token generator.
"""
def __init__(self):
"""
Create a new instance of a token generator.
"""
self.expires_in = {}
self.refresh_expires_in = 3600
def create_access_token_data(self, grant_type):
"""
Create data needed by an access token.
:param grant_type:
:type grant_type: str
:return: A ``dict`` containing he ``access_token`` and the
``token_type``. If the value of ``TokenGenerator.expires_in``
is larger than 0, a ``refresh_token`` will be generated too.
:rtype: dict
"""
if grant_type == 'password':
self.expires_in['password'] = 36000
result = {"access_token": self.generate(), "token_type": "Bearer"}
if self.expires_in.get(grant_type, 0) > 0:
result["refresh_token"] = self.generate()
result["expires_in"] = self.expires_in[grant_type]
return result
def generate(self):
"""
Implemented by generators extending this base class.
:raises NotImplementedError:
"""
raise NotImplementedError
class URandomTokenGenerator(TokenGenerator):
"""
Create a token using ``os.urandom()``.
"""
def __init__(self, length=40):
self.token_length = length
TokenGenerator.__init__(self)
def generate(self):
"""
:return: A new token
:rtype: str
"""
random_data = os.urandom(100)
hash_gen = hashlib.new("sha512")
hash_gen.update(random_data)
return hash_gen.hexdigest()[:self.token_length]
class Uuid4(TokenGenerator):
"""
Generate a token using uuid4.
"""
def generate(self):
"""
:return: A new token
:rtype: str
"""
return str(uuid.uuid4())
......@@ -15,7 +15,6 @@
"""Utility functions needed by the Zoe REST API."""
import time
import base64
import logging
import functools
......
......@@ -21,6 +21,7 @@
import json
import datetime
import logging
from jinja2 import Environment, FileSystemLoader, Markup, TemplateSyntaxError
......@@ -30,6 +31,8 @@ import tornado.web
import zoe_lib.version
import zoe_api.web.utils
log = logging.getLogger(__name__)
class JinjaApp(object):
"""A Jinja2-capable Tornado application."""
......@@ -125,6 +128,7 @@ class ZoeRequestHandler(tornado.web.RequestHandler):
zoe_api.web.utils.error_page(self, 'Template syntax error at {}:{}:<br> {}'.format(e.name, e.lineno, e.message), 500)
return
except Exception as e:
log.exception('Jinja2 exception while rendering the template {}'.format(template_name))
zoe_api.web.utils.error_page(self, 'Jinja2 template exception: {}'.format(e), 500)
return
self.finish(html)
......
......@@ -45,13 +45,18 @@ class StatusEndpointWeb(ZoeRequestHandler):
for exec_id in stats['running_queue']:
executions_in_queue[exec_id] = self.api_endpoint.execution_by_id(uid, role, exec_id)
max_service_count = max([len(node['services']) for node in stats['platform_stats']['nodes']])
services_per_node = {}
for node in stats['platform_stats']['nodes']:
services_per_node[node['name']] = self.api_endpoint.sql.services.select(backend_host=node['name'], backend_status='started')
max_service_count = max([len(services_per_node[name]) for name in services_per_node])
template_vars = {
"uid": uid,
"role": role,
"stats": stats,
"executions_in_queue": executions_in_queue,
"services_per_node": services_per_node,
"max_service_count": max_service_count
}
......
......@@ -36,13 +36,15 @@
<li>Time finished: <script>format_timestamp("{{ e.time_end }}")</script></li>
{% endif %}
<li>Status: <span style="font-weight: bold;">{{ e.status }}</span></li>
{% if e.is_running %}
<li>Actions:
{% if e.is_active %}
<a href="/executions/terminate/{{ e.id }}">Terminate</a>
{% else %}
<a href="/executions/restart/{{ e.id }}">Restart</a>
{% endif %}
<a href="/executions/terminate/{{ e.id }}">Terminate</a>
</li>
{% elif e.status == "terminated" %}
<li>Actions:
<a href="/executions/restart/{{ e.id }}">Restart</a>
</li>
{% endif %}
</ul>
{% if e.status == 'error' %}
<p>Error message: <code>{{ e.error_message }}</code></p>
......@@ -57,7 +59,13 @@
{% endfor %}
</ul>
{% else %}
<p>This execution does not have any active endpoint</p>
{% if e.status == "running" %}
<p>This execution does not have any active endpoint</p>
{% elif e.status == "submitted" or e.status == "starting" or e.status == "scheduled" %}
<p>Please wait, the execution is starting up. This page will refresh automatically.</p>
{% else %}
<p>No endpoints are available</p>
{% endif %}
{% endif %}
</div>
......@@ -85,7 +93,11 @@
<td>{{ s['status'] }}</td>
<td>{{ s['backend_status'] }}</td>
<td>{{ s['backend_host'] }}</td>
<td><a href="{{ reverse_url("service_logs", s['id']) }}">available</a></td>
{% if s['status'] != 'created' %}
<td><a href="{{ reverse_url("service_logs", s['id']) }}">open</a></td>
{% else %}
<td></td>
{% endif %}
<td>{{ s['error_message'] }}</td>
</tr>
{% endfor %}
......@@ -95,10 +107,12 @@
</div>
</div>
{% if e.status != 'terminated' %}
<script>
function refresh_page() {
document.location.reload();
}
setInterval(refresh_page, 15000);
</script>
{% endif %}
{% endblock %}
......@@ -128,10 +128,10 @@
data = {
datasets: [{
label: 'Memory',
data: [{{ node['memory_reserved'] }}, {{ node['memory_in_use'] }}],
backgroundColor: ['rgba(0, 169, 225, 1.0)', 'rgba(145, 192, 46, 1.0)']
data: [{{ node['memory_reserved'] }}, {{ node['memory_allocated'] }}, {{ node['memory_in_use'] }}],
backgroundColor: ['rgba(0, 169, 225, 1.0)', 'rgba(53, 51, 144, 1.0)', 'rgba(145, 192, 46, 1.0)']
}],
'labels': ['Reserved', 'In use']
'labels': ['Reserved', 'Allocated', 'In use']
};
ctx = document.getElementById("{{ node.name }}-mem").getContext('2d');
new Chart(ctx,{
......@@ -175,11 +175,11 @@
<script>
data = {
datasets: [{
label: 'Reserved cores',
data: [{{ node['cores_reserved'] }}, {{ node['cores_in_use'] }}],
backgroundColor: ['rgba(0, 169, 225, 1.0)', 'rgba(145, 192, 46, 1.0)']
label: 'Cores',
data: [{{ node['cores_reserved'] }}, {{ node['cores_allocated'] }}, {{ node['cores_in_use'] }}],
backgroundColor: ['rgba(0, 169, 225, 1.0)', 'rgba(53, 51, 144, 1.0)', 'rgba(145, 192, 46, 1.0)']
}],
'labels': ['Reserved', 'In use']
'labels': ['Reserved', 'Allocated', 'In use']
};
ctx = document.getElementById("{{ node.name }}-cpu").getContext('2d');
new Chart(ctx,{
......@@ -203,25 +203,6 @@
}
}
});
data = {
datasets: [{
label: 'Used memory',
data: [{{ node['cores_in_use'] }}, {{ node['cores_total'] - node['cores_in_use'] }}],
backgroundColor: ['rgba(0, 169, 225, 1.0)', 'rgba(145, 192, 46, 1.0)']
}],
'labels': ['In-use', 'Free']
};
ctx = document.getElementById("{{ node.name }}-cpu-use").getContext('2d');
myPieChart = new Chart(ctx,{
type: 'pie',
data: data,
options: {
animation: {
animateRotate: false
}
}
});
</script>
</div>
{% endfor %}
......@@ -232,14 +213,14 @@
<thead>
<tr>
<th class="cell-host">Host</th>
<th colspan="{{ max_service_count }}">Services and reserved resources</th>
<th colspan="{{ max_service_count }}">Services and allocated resources</th>
</tr>
</thead>
<tbody>
{% for node in stats.platform_stats.nodes %}
<tr>
<td class="cell-host">{{ node.name }}</td>
{% for service in node.services %}
{% for service in services_per_node[node.name] %}
<td><a href="{{ reverse_url('execution_inspect', service['execution_id']) }}">{{ service['name'] }}</a> (M: <script>format_bytes({{ node.service_stats[service['id']|string]['mem_limit'] }});</script> C: {{ node.service_stats[service['id']|string]['core_limit'] }})</td>
{% endfor %}
</tr>
......
......@@ -91,7 +91,7 @@ def get_auth(handler: ZoeRequestHandler):
if handler.get_secure_cookie('zoe'):
cookie_val = str(handler.get_secure_cookie('zoe'))
uid, role = cookie_val[2:-1].split('.')
log.debug('Authentication done using cookie')
log.debug('Authentication done using cookie (user {} from {} for {})'.format(uid, handler.request.remote_ip, handler.request.path))
return uid, role
else:
return None, None
......
......@@ -40,6 +40,9 @@ class ZAppParameter:
self.type = "text"
elif param_manifest['type'] == 'int':
self.type = "number"
self.max = param_manifest['max']
self.min = param_manifest['min']
self.step = param_manifest['step']
else:
self.type = "text"
......
......@@ -62,7 +62,7 @@ def load_configuration(test_conf=None):
# Master options
argparser.add_argument('--api-listen-uri', help='ZMQ API listen address', default='tcp://*:4850')
argparser.add_argument('--kairosdb-enable', action="store_true", help='Enable metric input from KairosDB')
argparser.add_argument('--kairosdb-enable', action="store_true", help='Enable usage metric input from KairosDB')
argparser.add_argument('--kairosdb-url', help='URL of the KairosDB service (ex. http://localhost:8086)', default='http://localhost:8090')
argparser.add_argument('--workspace-base-path', help='Base directory where user workspaces will be created. Must be visible at this path on all hosts.', default='/mnt/zoe-workspaces')
......@@ -99,7 +99,6 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='Swarm', help='Which backend to enable')
argparser.add_argument('--backend-image-management', action='store_true', help='Enable image management (not implemented for all backends, check the documentation')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
......
......@@ -38,10 +38,10 @@ class BaseRecord:
class BaseTable:
"""Common abstraction for all tables."""
def __init__(self, connection, cursor, table_name):
def __init__(self, sql_manager, table_name):
self.table_name = table_name
self.connection = connection
self.cursor = cursor
self.sql_manager = sql_manager
self.cursor = self.sql_manager.cursor()
def create(self):
"""Create this table."""
......@@ -55,7 +55,7 @@ class BaseTable:
"""Delete a record from this table."""
query = "DELETE FROM {} WHERE id = %s".format(self.table_name)
self.cursor.execute(query, (record_id,))
self.connection.commit()
self.sql_manager.commit()
def update(self, record_id, **kwargs):
"""Update the state of an execution."""
......@@ -69,9 +69,8 @@ class BaseTable:
q_base = 'UPDATE {} SET '.format(self.table_name) + set_q + ' WHERE id=%s'
query = self.cursor.mogrify(q_base, value_list)
self.cursor.execute(query)
self.connection.commit()
self.sql_manager.commit()
def select(self, only_one=False, limit=-1, **kwargs):
"""Select records."""
raise NotImplementedError
......@@ -35,7 +35,6 @@ class Execution(BaseRecord):
"""
SUBMIT_STATUS = "submitted"
IMAGE_DL_STATUS = "image download"
SCHEDULED_STATUS = "scheduled"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
......@@ -98,11 +97,6 @@ class Execution(BaseRecord):
self._status = self.SCHEDULED_STATUS
self.sql_manager.executions.update(self.id, status=self._status)
def set_image_dl(self):
"""The execution has been added to the scheduler queues."""
self._status = self.IMAGE_DL_STATUS
self.sql_manager.executions.update(self.id, status=self._status)
def set_starting(self):
"""The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS
......@@ -142,7 +136,7 @@ class Execution(BaseRecord):
Returns False if the execution ended completely
:return:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS or self._status == self.IMAGE_DL_STATUS
return self._status == self.SUBMIT_STATUS and self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
@property
def is_running(self):
......@@ -218,8 +212,8 @@ class Execution(BaseRecord):
class ExecutionTable(BaseTable):
"""Abstraction for the execution table in the database."""
def __init__(self, connection, cursor):
super().__init__(connection, cursor, "execution")
def __init__(self, sql_manager):
super().__init__(sql_manager, "execution")
def create(self):
"""Create the execution table."""
......@@ -242,7 +236,7 @@ class ExecutionTable(BaseTable):
time_submit = datetime.datetime.utcnow()
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
self.cursor.execute(query)
self.connection.commit()
self.sql_manager.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
......@@ -291,6 +285,6 @@ class ExecutionTable(BaseTable):
row = self.cursor.fetchone()
if row is None:
return None
return Execution(row, self)
return Execution(row, self.sql_manager)
else:
return [Execution(x, self) for x in self.cursor]
return [Execution(x, self.sql_manager) for x in self.cursor]
......@@ -65,8 +65,8 @@ class Port(BaseRecord):
class PortTable(BaseTable):
"""Abstraction for the port table in the database."""
def __init__(self, connection, cursor):
super().__init__(connection, cursor, "port")
def __init__(self, sql_manager):
super().__init__(sql_manager, "port")
def create(self):
"""Create the Port table."""
......@@ -83,7 +83,7 @@ class PortTable(BaseTable):
"""Adds a new port to the state."""
query = self.cursor.mogrify('INSERT INTO port (id, service_id, internal_name, external_ip, external_port, description) VALUES (DEFAULT, %s, %s, NULL, NULL, %s) RETURNING id', (service_id, internal_name, description))
self.cursor.execute(query)
self.connection.commit()
self.sql_manager.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
......@@ -119,6 +119,6 @@ class PortTable(BaseTable):
row = self.cursor.fetchone()
if row is None:
return None
return Port(row, self)
return Port(row, self.sql_manager)
else:
return [Port(x, self) for x in self.cursor]
return [Port(x, self.sql_manager) for x in self.cursor]
......@@ -270,8 +270,8 @@ class Service(BaseRecord):
class ServiceTable(BaseTable):
"""Abstraction for the service table in the database."""
def __init__(self, connection, cursor):
super().__init__(connection, cursor, "service")
def __init__(self, sql_manager):
super().__init__(sql_manager, "service")
def create(self):
"""Create the service table."""
......@@ -295,7 +295,7 @@ class ServiceTable(BaseTable):
status = Service.CREATED_STATUS
query = self.cursor.mogrify('INSERT INTO service (id, status, execution_id, name, service_group, description, essential) VALUES (DEFAULT,%s,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential))
self.cursor.execute(query)
self.connection.commit()
self.sql_manager.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
......@@ -334,6 +334,6 @@ class ServiceTable(BaseTable):
row = self.cursor.fetchone()
if row is None:
return None
return Service(row, self)
return Service(row, self.sql_manager)
else:
return [Service(x, self) for x in self.cursor]
return [Service(x, self.sql_manager) for x in self.cursor]
......@@ -20,14 +20,14 @@ import logging
import psycopg2
import psycopg2.extras
from .service import ServiceTable
from .execution import ExecutionTable
from .port import PortTable
from zoe_lib.config import get_conf
from zoe_lib.version import SQL_SCHEMA_VERSION
import zoe_lib.exceptions
from .service import ServiceTable
from .execution import ExecutionTable
from .port import PortTable
log = logging.getLogger(__name__)
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
......@@ -54,7 +54,8 @@ class SQLManager:
self.conn = psycopg2.connect(dsn)
def _cursor(self):
def cursor(self):
"""Get a cursor, making sure the connection to the database is established."""
try:
cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
except psycopg2.InterfaceError:
......@@ -63,23 +64,24 @@ class SQLManager:
cur.execute('SET search_path TO {},public'.format(self.schema))
return cur
def commit(self):
"""Commit a transaction."""
self.conn.commit()
@property
def executions(self) -> ExecutionTable:
"""Access the execution state."""
cur = self._cursor()
return ExecutionTable(self.conn, cur)
return ExecutionTable(self)
@property
def services(self) -> ServiceTable:
"""Access the service state."""
cur = self._cursor()
return ServiceTable(self.conn, cur)
return ServiceTable(self)
@property
def ports(self) -> PortTable:
"""Access the port state."""
cur = self._cursor()
return PortTable(self.conn, cur)
return PortTable(self)
def _create_tables(self):
self.executions.create()
......@@ -101,7 +103,7 @@ class SQLManager:
if not self._check_schema_version(cur, get_conf().deployment_name):
self._create_tables()
self.conn.commit()
self.commit()
cur.close()
def _check_schema_version(self, cur, deployment_name):
......
......@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and