Commit 6302418e authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Start work on users

parent 9c4cda74
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 4 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......@@ -47,16 +47,52 @@ def check_schema_version(cur, deployment_name):
else:
if row[0] == SQL_SCHEMA_VERSION:
return True
if row[0] == 4:
_create_tables_v5(cur)
_update_schema_v5(cur)
return True
else:
raise zoe_api.exceptions.ZoeException('SQL database schema version mismatch: need {}, found {}'.format(SQL_SCHEMA_VERSION, row[0]))
def create_tables(cur):
"""Create the Zoe database tables."""
_create_tables_v5(cur)
_create_tables_v4(cur)
_create_tables_v3(cur)
_update_schema_v5(cur)
def _create_tables_v5(cur):
cur.execute('''CREATE TABLE quotas (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
concurrent_executions INT NOT NULL,
memory BIGINT NOT NULL,
cores INT NOT NULL,
volume_size BIGINT NOT NULL
)''')
cur.execute('''INSERT INTO quotas (id, name, concurrent_executions, memory, cores, volume_size) VALUES (DEFAULT, 'default', 5, 34359738368, 20, 34359738368)''')
cur.execute('''CREATE TABLE users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL,
email TEXT,
priority SMALLINT NOT NULL DEFAULT 0,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
quota_id INT REFERENCES quotas
)''')
def _update_schema_v5(cur):
cur.execute('''ALTER TABLE execution ALTER COLUMN id TYPE BIGINT''')
cur.execute('''ALTER TABLE service ALTER COLUMN id TYPE BIGINT''')
def _create_tables_v4(cur):
cur.execute('''CREATE TABLE execution (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
user_id TEXT NOT NULL,
user_id INT REFERENCES users,
description JSON NOT NULL,
status TEXT NOT NULL,
execution_manager_id TEXT NULL,
......@@ -78,6 +114,9 @@ def create_tables(cur):
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
def _create_tables_v3(cur):
cur.execute('''CREATE TABLE port (
id SERIAL PRIMARY KEY,
service_id INT REFERENCES service ON DELETE CASCADE,
......@@ -86,7 +125,7 @@ def create_tables(cur):
external_port INT NULL,
description JSON NOT NULL
)''')
#Create oauth_client and oauth_token tables for oAuth2
# Create oauth_client and oauth_token tables for oAuth2
cur.execute('''CREATE TABLE oauth_client (
identifier TEXT PRIMARY KEY,
secret TEXT,
......@@ -104,7 +143,7 @@ def create_tables(cur):
refresh_token TEXT,
refresh_token_expires_at TIMESTAMP,
scopes TEXT,
user_id TEXT
user_id INT REFERENCES users
)''')
......@@ -126,8 +165,15 @@ def init(force=False):
cur.execute("DELETE FROM public.versions WHERE deployment = %s", (get_conf().deployment_name,))
cur.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(get_conf().deployment_name))
if not check_schema_version(cur, get_conf().deployment_name):
create_tables(cur)
try:
if not check_schema_version(cur, get_conf().deployment_name):
create_tables(cur)
except zoe_api.exceptions.ZoeException as e:
if force:
cur.execute("DELETE FROM public.versions WHERE deployment = %s", (get_conf().deployment_name,))
cur.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(get_conf().deployment_name))
else:
raise e
conn.commit()
cur.close()
......
......@@ -61,9 +61,11 @@ def zoe_web_main() -> int:
'static_path': os.path.join(os.path.dirname(__file__), "web", "static"),
'template_path': os.path.join(os.path.dirname(__file__), "web", "templates"),
'cookie_secret': config.get_conf().cookie_secret,
'login_url': '/login', # used only by the web interface
'debug': args.debug
}
app = Application(zoe_api.web.web_init(api_endpoint) + zoe_api.rest_api.api_init(api_endpoint), **app_settings)
app = Application(zoe_api.web.web_init() + zoe_api.rest_api.api_init(api_endpoint), **app_settings)
app.api_endpoint = api_endpoint
JinjaApp.init_app(app)
log.info("Starting HTTP server...")
......
......@@ -42,12 +42,16 @@ def api_init(api_endpoint) -> List[tornado.web.URLSpec]:
api_routes = [
tornado.web.url(API_PATH + r'/info', InfoAPI, route_args),
tornado.web.url(API_PATH + r'/login', LoginAPI, route_args),
tornado.web.url(API_PATH + r'/userinfo', UserInfoAPI, route_args),
tornado.web.url(API_PATH + r'/zapp_validate', ZAppValidateAPI, route_args),
tornado.web.url(API_PATH + r'/user/login', UserLoginAPI, route_args),
tornado.web.url(API_PATH + r'/user/logout', UserLogoutAPI, route_args),
tornado.web.url(API_PATH + r'/user/([0-9]+)', UserInfoAPI, route_args),
tornado.web.url(API_PATH + r'/quota', QuotaCollectionAPI, route_args),
tornado.web.url(API_PATH + r'/quota/([0-9]+)', QuotaAPI, route_args),
tornado.web.url(API_PATH + r'/zapp/validate', ZAppValidateAPI, route_args),
tornado.web.url(API_PATH + r'/execution/([0-9]+)', ExecutionAPI, route_args),
tornado.web.url(API_PATH + r'/execution/delete/([0-9]+)', ExecutionDeleteAPI, route_args),
tornado.web.url(API_PATH + r'/execution/endpoints/([0-9]+)', ExecutionEndpointsAPI, route_args),
tornado.web.url(API_PATH + r'/execution', ExecutionCollectionAPI, route_args),
......
......@@ -22,33 +22,20 @@ import tornado.web
import zoe_api.web.start
import zoe_api.web.executions
from zoe_lib.version import ZOE_API_VERSION, ZOE_VERSION
def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
def web_init() -> List[tornado.web.URLSpec]:
"""Tornado init for the web interface."""
route_args = {
'api_endpoint': api_endpoint
}
web_routes = [
tornado.web.url(r'/', zoe_api.web.start.RootWeb, route_args, name='root'),
tornado.web.url(r'/user', zoe_api.web.start.HomeWeb, route_args, name='home_user'),
tornado.web.url(r'/login', zoe_api.web.start.LoginWeb, route_args, name='login'),
tornado.web.url(r'/executions/new', zoe_api.web.executions.ExecutionDefineWeb, route_args, name='execution_define'),
tornado.web.url(r'/executions/start', zoe_api.web.executions.ExecutionStartWeb, route_args, name='execution_start'),
tornado.web.url(r'/executions/restart/([0-9]+)', zoe_api.web.executions.ExecutionRestartWeb, route_args, name='execution_restart'),
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, route_args, name='execution_terminate'),
tornado.web.url(r'/executions/delete/([0-9]+)', zoe_api.web.executions.ExecutionDeleteWeb, route_args, name='execution_delete'),
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, route_args, name='execution_inspect')
tornado.web.url(r'/', zoe_api.web.start.RootWeb, name='root'),
tornado.web.url(r'/user', zoe_api.web.start.HomeWeb, name='home_user'),
tornado.web.url(r'/login', zoe_api.web.start.LoginWeb, name='login'),
tornado.web.url(r'/executions/new', zoe_api.web.executions.ExecutionDefineWeb, name='execution_define'),
tornado.web.url(r'/executions/start', zoe_api.web.executions.ExecutionStartWeb, name='execution_start'),
tornado.web.url(r'/executions/restart/([0-9]+)', zoe_api.web.executions.ExecutionRestartWeb, name='execution_restart'),
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, name='execution_terminate'),
tornado.web.url(r'/executions/delete/([0-9]+)', zoe_api.web.executions.ExecutionDeleteWeb, name='execution_delete'),
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, name='execution_inspect')
]
return web_routes
def inject_version():
"""Inject some template variables in all templates."""
return {
'zoe_version': ZOE_VERSION,
'zoe_api_version': ZOE_API_VERSION,
}
......@@ -83,10 +83,17 @@ def tojson_filter(obj, **kwargs):
class ZoeRequestHandler(tornado.web.RequestHandler):
"""usage:
class JinjaPoweredHandler(JinjaTemplateMixin, tornado.web.RequestHandler):
pass
"""
"""Custom Zoe Tornado handler."""
def get_current_user(self):
"""Implement cookie-auth the Tornado way."""
user_id = self.get_secure_cookie("zoeweb_user")
if not user_id:
return None
user = self.application.api_endpoint.user_get(user_id)
if user is None or not user.enabled:
return None
return user
def initialize(self, *args_, **kwargs_):
"""Initialize the Jinja template system."""
......
......@@ -18,6 +18,8 @@
from random import randint
import json
import tornado.web
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth_login, get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler
......@@ -28,11 +30,11 @@ class RootWeb(ZoeRequestHandler):
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.api_endpoint = self.application.api_endpoint # type: APIEndpoint
@catch_exceptions
@tornado.web.authenticated
def get(self):
"""Home page without authentication."""
"""Home page."""
self.render('index.html')
......@@ -41,7 +43,7 @@ class LoginWeb(ZoeRequestHandler):
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.api_endpoint = self.application.api_endpoint # type: APIEndpoint
@catch_exceptions
def get(self):
......
......@@ -16,6 +16,7 @@
"""Interface to PostgresQL for Zoe state."""
import logging
from typing import Dict
log = logging.getLogger(__name__)
......@@ -24,13 +25,16 @@ class Base:
"""
:type sql_manager: SQLManager
"""
def __init__(self, d, sql_manager):
def __init__(self, d: Dict, sql_manager):
"""
:type sql_manager: SQLManager
"""
self.sql_manager = sql_manager
self.id = d['id']
def serialize(self):
def serialize(self) -> Dict:
"""Generates a dictionary that can be serialized in JSON."""
raise NotImplementedError
def __eq__(self, other):
return self.id == other.id
......@@ -19,10 +19,12 @@ import datetime
import logging
import threading
from zoe_lib.state.base import Base
log = logging.getLogger(__name__)
class Execution:
class Execution(Base):
"""
A Zoe execution.
......@@ -40,8 +42,7 @@ class Execution:
TERMINATED_STATUS = "terminated"
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
super().__init__(d, sql_manager)
self.user_id = d['user_id']
self.name = d['name']
......@@ -84,9 +85,6 @@ class Execution:
'services': [s.id for s in self.services]
}
def __eq__(self, other):
return self.id == other.id
def set_scheduled(self):
"""The execution has been added to the scheduler queues."""
self._status = self.SCHEDULED_STATUS
......
......@@ -17,6 +17,8 @@
import logging
from zoe_lib.state.base import Base
log = logging.getLogger(__name__)
......@@ -37,12 +39,11 @@ class ExposedPort:
return NotImplemented
class Port:
class Port(Base):
"""A tcp or udp port that should be exposed by the backend."""
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
super().__init__(d, sql_manager)
self.internal_name = d['internal_name']
self.external_ip = d['external_ip']
......@@ -63,9 +64,6 @@ class Port:
'description': self.description
}
def __eq__(self, other):
return self.id == other.id
def activate(self, ext_ip, ext_port):
"""The backend has exposed the port."""
self.sql_manager.port_update(self.id, external_ip=ext_ip, external_port=ext_port)
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to PostgresQL for Zoe state."""
import logging
from zoe_lib.state.base import Base
log = logging.getLogger(__name__)
class Quota(Base):
"""A quota object describes limits imposed to users on resource usage."""
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
self.name = d['name']
self._concurrent_executions = d['concurrent_executions']
self._memory = d['memory']
self._cores = d['cores']
self._volume_size = d['volume_size']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'name': self.name,
'concurrent_executions': self._concurrent_executions,
'cores': self._cores,
'volume_size': self._volume_size
}
@property
def concurrent_executions(self):
"""Getter for concurrent executions limit."""
return self._concurrent_executions
@concurrent_executions.setter
def concurrent_executions(self, value):
"""Setter for concurrent execution limit."""
self._concurrent_executions = value
self.sql_manager.quota_update(self.id, concurrent_executions=value)
@property
def memory(self):
"""Getter for memory limit."""
return self._memory
@memory.setter
def memory(self, value):
"""Setter for memory limit."""
self._memory = value
self.sql_manager.quota_update(self.id, memory=value)
@property
def cores(self):
"""Getter for cores limit."""
return self._cores
@cores.setter
def cores(self, value):
"""Setter for cores limit."""
self._cores = value
self.sql_manager.quota_update(self.id, cores=value)
@property
def volume_size(self):
"""Getter for volume_size limit."""
return self._volume_size
@volume_size.setter
def volume_size(self, value):
"""Setter for volume_size limit."""
self._volume_size = value
self.sql_manager.quota_update(self.id, volume_size=value)
......@@ -18,6 +18,7 @@
import logging
from zoe_lib.config import get_conf
from zoe_lib.state.base import Base
log = logging.getLogger(__name__)
......@@ -70,7 +71,7 @@ class VolumeDescription:
self.readonly = data[2]
class Service:
class Service(Base):
"""A Zoe Service."""
TERMINATING_STATUS = "terminating"
......@@ -88,8 +89,7 @@ class Service:
BACKEND_OOM_STATUS = 'oom-killed'
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
super().__init__(d, sql_manager)
self.name = d['name']
self.status = d['status']
......@@ -133,9 +133,6 @@ class Service:
'proxy_address': self.proxy_address
}
def __eq__(self, other):
return self.id == other.id
def set_terminating(self):
"""The service is being killed."""
self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
......
......@@ -24,6 +24,8 @@ import psycopg2.extras
from .service import Service
from .execution import Execution
from .port import Port
from .quota import Quota
from .user import User
log = logging.getLogger(__name__)
......@@ -254,6 +256,125 @@ class SQLManager:
self.conn.commit()
return cur.fetchone()[0]
def quota_list(self, only_one=False, **kwargs):
"""
Return a list of ports.
:param only_one: only one result is expected
:type only_one: bool
:param kwargs: filter services based on their fields/columns
:return: one or more ports
"""
cur = self._cursor()
q_base = 'SELECT * FROM quotas'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
query = cur.mogrify(q, args_list)
else:
query = cur.mogrify(q_base)
cur.execute(query)
if only_one:
row = cur.fetchone()
if row is None:
return None
return Quota(row, self)
else:
return [Quota(x, self) for x in cur]
def quota_update(self, port_id, **kwargs):
"""Update the state of an existing port."""
cur = self._cursor()
arg_list = []
value_list = []
for key, value in kwargs.items():
arg_list.append('{} = %s'.format(key))
value_list.append(value)
set_q = ", ".join(arg_list)
value_list.append(port_id)
q_base = 'UPDATE quota SET ' + set_q + ' WHERE id=%s'
query = cur.mogrify(q_base, value_list)
cur.execute(query)
self.conn.commit()
def quota_new(self, name, concurrent_executions, memory, cores, volume_size):
"""Adds a new port to the state."""
cur = self._cursor()
query = cur.mogrify('INSERT INTO quotas (id, name, concurrent_executions, memory, cores, volume_size) VALUES (DEFAULT, %s, %s, %s, %s, %s) RETURNING id', (name, concurrent_executions, memory, cores, volume_size))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
def quota_delete(self, quota_id):
"""Delete an execution and its services from the state."""
cur = self._cursor()
query = "UPDATE users SET quota_id = (SELECT id from quotas WHERE name='default') WHERE quota_id=%s"
cur.execute(query, (quota_id,))
query = "DELETE FROM quotas WHERE id = %s"
cur.execute(query, (quota_id,))
self.conn.commit()
def user_list(self, only_one=False, **kwargs):
"""
Return a list of ports.
:param only_one: only one result is expected
:type only_one: bool
:param kwargs: filter services based on their fields/columns
:return: one or more ports
"""
cur = self._cursor()
q_base = 'SELECT * FROM users'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
query = cur.mogrify(q, args_list)
else:
query = cur.mogrify(q_base)
cur.execute(query)
if only_one:
row = cur.fetchone()
if row is None:
return None
return User(row, self)
else:
return [User(x, self) for x in cur]
def user_update(self, port_id, **kwargs):
"""Update the state of an existing port."""
cur = self._cursor()
arg_list = []
value_list = []
for key, value in kwargs.items():
arg_list.append('{} = %s'.format(key))
value_list.append(value)
set_q = ", ".join(arg_list)
value_list.append(port_id)
q_base = 'UPDATE users SET ' + set_q + ' WHERE id=%s'
query = cur.mogrify(q_base, value_list)
cur.execute(query)
self.conn.commit()
def user_new(self, username):
"""Adds a new port to the state."""
cur = self._cursor()
query = cur.mogrify('INSERT INTO users (id, username, email, priority, enabled, quota_id) VALUES (DEFAULT, %s, NULL, DEFAULT, DEFAULT, (SELECT id FROM quotas WHERE name=\'default\')) RETURNING id', (username,))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
# The section below is used for Oauth2 authentication mechanism
def fetch_by_refresh_token(self, refresh_token):
......@@ -301,7 +422,7 @@ class SQLManager:
return cur.fetchone()
def save_token(self, client_id, grant_type, token, data, expires_at, refresh_token, refresh_expires_at, scopes, user_id): #pylint: disable=too-many-arguments
def save_token(self, client_id, grant_type, token, data, expires_at, refresh_token, refresh_expires_at, scopes, user_id): # pylint: disable=too-many-arguments
""" save token to db """
cur = self._cursor()
expires_at = datetime.datetime.fromtimestamp(expires_at)
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#