...
 
Commits (29)
......@@ -89,6 +89,7 @@ api-test:
- zoe-api-${CI_BUILD_REF}.log
- zoe-master-${CI_BUILD_REF}.log
- tests
when: always
images:
image: docker:latest
......@@ -133,7 +134,7 @@ zoe:
- 'echo -e "Host *\n\tStrictHostKeyChecking no\n\n" > ~/.ssh/config'
- apt-get update -y && apt-get install rsync -y
script:
- rsync -avz . ubuntu@${STAGING_IP}:${ZOE_STAGING_PATH}
- rsync -avz --delete . ubuntu@${STAGING_IP}:${ZOE_STAGING_PATH}
- ssh ubuntu@${STAGING_IP} sudo pip install --upgrade -r /srv/zoe/requirements.txt
- ssh ubuntu@${STAGING_IP} /home/ubuntu/clean_zoe_db.sh
- ssh ubuntu@${STAGING_IP} sudo supervisorctl restart zoe-api
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Database upgrade."""
import sys
import psycopg2
import psycopg2.extras
from zoe_lib.config import get_conf, load_configuration
from zoe_api.db_init import SQL_SCHEMA_VERSION
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
def _get_schema_version(dsn):
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
cur.execute("SELECT version FROM public.versions WHERE deployment = %s", (get_conf().deployment_name,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return row[0]
def check_schema_version(dsn):
"""Check if the schema version matches this source code version."""
current_version = _get_schema_version(dsn)
if current_version is None:
print('No database schema found for this deployment, use the "create_db_tables.py" script to create one.')
sys.exit(0)
else:
print("Detected schema version {}".format(current_version))
if current_version < 1:
print('Schema version {} is too old, cannot upgrade')
sys.exit(1)
if current_version == SQL_SCHEMA_VERSION:
print('DB schema already at latest supported version, no upgrade to perform.')
sys.exit(1)
elif current_version < SQL_SCHEMA_VERSION:
upgrade_schema_from(current_version, dsn)
else:
print('SQL database schema version mismatch: need {}, found {}, cannot downgrade'.format(SQL_SCHEMA_VERSION, current_version))
sys.exit(1)
def upgrade_schema_from(start_version, dsn):
"""Main schema upgrader, calls specific version upgraders as needed"""
print('Upgrading database from version {} to version {}'.format(start_version, SQL_SCHEMA_VERSION))
while start_version < SQL_SCHEMA_VERSION:
new_version = start_version + 1
UPGRADERS[new_version](dsn)
start_version = new_version
def upgrade_to_2(dsn):
"""Perform schema upgrade from version 2 to version 3."""
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
print('Applying schema version 2...')
cur.execute("UPDATE public.versions SET version = 2 WHERE deployment = %s", (get_conf().deployment_name,))
conn.commit()
cur.close()
conn.close()
return
def upgrade_to_3(dsn):
"""Perform schema upgrade from version 2 to version 3."""
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
print('Applying schema version 3...')
cur.execute("ALTER TABLE service RENAME COLUMN docker_id TO backend_id")
cur.execute("ALTER TABLE service RENAME COLUMN docker_status TO backend_status")
cur.execute("ALTER TABLE service ADD ip_address CIDR DEFAULT NULL NULL")
cur.execute("ALTER TABLE service ADD essential BOOLEAN DEFAULT FALSE NOT NULL")
cur.execute('''CREATE TABLE oauth_client (
identifier TEXT PRIMARY KEY,
secret TEXT,
role TEXT,
redirect_uris TEXT,
authorized_grants TEXT,
authorized_response_types TEXT
)''')
cur.execute('''CREATE TABLE oauth_token (
client_id TEXT PRIMARY KEY,
grant_type TEXT,
token TEXT,
data TEXT,
expires_at TIMESTAMP,
refresh_token TEXT,
refresh_token_expires_at TIMESTAMP,
scopes TEXT,
user_id TEXT
)''')
cur.execute("UPDATE public.versions SET version = 3 WHERE deployment = %s", (get_conf().deployment_name,))
conn.commit()
cur.close()
conn.close()
return
def upgrade_to_4(dsn):
"""Perform schema upgrade from version 3 to version 4."""
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur2 = conn.cursor()
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
cur2.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
print('Applying schema version 4...')
cur.execute('''CREATE TABLE port (
id SERIAL PRIMARY KEY,
service_id INT REFERENCES service ON DELETE CASCADE,
internal_name TEXT NOT NULL,
external_ip INET NULL,
external_port INT NULL,
description JSON NOT NULL
)''')
print('Filling the new port table with data from old service descriptions')
cur.execute("SELECT id, description FROM service")
for service_id, service_descr in cur:
for port_descr in service_descr['ports']:
port_internal = str(port_descr['port_number']) + '/' + port_descr['protocol']
cur2.execute('INSERT INTO port (id, service_id, internal_name, external_ip, external_port, description) VALUES (DEFAULT, %s, %s, NULL, NULL, %s) RETURNING id', (service_id, port_internal, port_descr))
cur.execute("UPDATE public.versions SET version = 4 WHERE deployment = %s", (get_conf().deployment_name,))
conn.commit()
cur.close()
cur2.close()
conn.close()
return
def upgrade_to_5(dsn):
"""Perform schema upgrade from version 4 to version 5."""
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur2 = conn.cursor()
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
cur2.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
print('Applying schema version 5...')
print('-> changing type of service id to BIGINT')
cur.execute("ALTER TABLE service ALTER COLUMN id TYPE BIGINT")
print('-> changing type of execution id to BIGINT')
cur.execute("ALTER TABLE execution ALTER COLUMN id TYPE BIGINT")
print('-> create table quotas')
cur.execute('''CREATE TABLE quotas (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
concurrent_executions INT NOT NULL,
memory BIGINT NOT NULL,
cores INT NOT NULL
)''')
print('-> create default quota')
cur.execute('''INSERT INTO quotas (id, name, concurrent_executions, memory, cores) VALUES (DEFAULT, 'default', 5, 34359738368, 20)''')
print('-> create table users')
cur.execute('''CREATE TABLE users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL,
role TEXT NOT NULL,
email TEXT,
priority SMALLINT NOT NULL DEFAULT 0,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
quota_id INT REFERENCES quotas
)''')
cur.execute('CREATE UNIQUE INDEX users_username_uindex ON users (username)')
print('Filling the user table from the execution data...')
print('-> The default quota will be assigned to all users')
cur.execute("SELECT user_id FROM execution")
users = set([u[0] for u in cur])
for user_name in users:
cur2.execute("INSERT INTO users (id, username, role, email, priority, enabled, quota_id) VALUES (DEFAULT, %s, 'user', NULL, DEFAULT, DEFAULT, currval('quotas_id_seq'))", (user_name, ))
cur2.execute("UPDATE execution SET user_id=currval('users_id_seq') WHERE user_id=%s", (user_name, ))
print('-> change type of user_id to INT')
cur.execute("ALTER TABLE execution ALTER COLUMN user_id TYPE INT USING CAST(user_id AS INT)")
print('-> create foreign key for executions.user_id pointing to users.id')
cur.execute('ALTER TABLE execution ADD CONSTRAINT execution_user_id_fk FOREIGN KEY (user_id) REFERENCES users (id)')
print('-> converting oauth tables')
cur.execute('DROP TABLE oauth_client')
cur.execute('DROP TABLE oauth_token')
cur.execute("UPDATE public.versions SET version = 5 WHERE deployment = %s", (get_conf().deployment_name,))
conn.commit()
cur.close()
cur2.close()
conn.close()
return
UPGRADERS = [
None,
None,
upgrade_to_2,
upgrade_to_3,
upgrade_to_4,
upgrade_to_5
]
def init():
"""DB upgrade entrypoint."""
load_configuration()
dsn = 'dbname=' + get_conf().dbname + \
' user=' + get_conf().dbuser + \
' password=' + get_conf().dbpass + \
' host=' + get_conf().dbhost + \
' port=' + str(get_conf().dbport)
check_schema_version(dsn)
return
if __name__ == "__main__":
init()
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#!/usr/bin/env python3
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -13,42 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Info API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.utils import get_auth, catch_exceptions, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
class UserInfoAPI(RequestHandler):
"""The UserInfo API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self):
"""HTTP GET method."""
uid, role = get_auth(self)
ret = {
'uid': uid,
'role': role
}
"""Command line client entry point."""
self.write(ret)
from zoe_cmd.entrypoint_admin import zoe
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
if __name__ == "__main__":
zoe()
This diff is collapsed.
# Copyright (c) 2017, Quang-Nhat HOANG-XUAN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Store adapters to read/write data to from/to PostgresSQL. """
import zoe_lib.state
from zoe_lib.config import get_conf
from oauth2.store import AccessTokenStore, ClientStore
from oauth2.datatype import AccessToken, Client
from oauth2.error import AccessTokenNotFound, ClientNotFoundError
class AccessTokenStorePg(AccessTokenStore):
""" AccessTokenStore for postgresql """
def fetch_by_refresh_token(self, refresh_token):
""" get accesstoken from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_by_refresh_token(refresh_token)
if data is None:
raise AccessTokenNotFound
return AccessToken(client_id=data["client_id"],
grant_type=data["grant_type"],
token=data["token"],
data=data["data"],
expires_at=data["expires_at"].timestamp(),
refresh_token=data["refresh_token"],
refresh_expires_at=data["refresh_token_expires_at"].timestamp(),
scopes=data["scopes"])
def delete_refresh_token(self, refresh_token):
"""
Deletes (invalidates) an old refresh token after use
:param refresh_token: The refresh token.
"""
sql = zoe_lib.state.SQLManager(get_conf())
res = sql.delete_refresh_token(refresh_token)
return res
def get_client_id_by_refresh_token(self, refresh_token):
""" get clientID from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_refresh_token(refresh_token)
return data
def get_client_id_by_access_token(self, access_token):
""" get clientID from accesstoken """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_access_token(access_token)
return data
def fetch_existing_token_of_user(self, client_id, grant_type, user_id):
""" get accesstoken from userid """
sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_existing_token_of_user(client_id, grant_type, user_id)
if data is None:
raise AccessTokenNotFound
return AccessToken(client_id=data["client_id"],
grant_type=data["grant_type"],
token=data["token"],
data=data["data"],
expires_at=data["expires_at"].timestamp(),
refresh_token=data["refresh_token"],
refresh_expires_at=data["refresh_token_expires_at"].timestamp(),
scopes=data["scopes"],
user_id=data["user_id"])
def save_token(self, access_token):
""" save accesstoken """
sql = zoe_lib.state.SQLManager(get_conf())
sql.save_token(access_token.client_id,
access_token.grant_type,
access_token.token,
access_token.data,
access_token.expires_at,
access_token.refresh_token,
access_token.refresh_expires_at,
access_token.scopes,
access_token.user_id)
return True
class ClientStorePg(ClientStore):
""" ClientStore for postgres """
def save_client(self, identifier, secret, role, redirect_uris, authorized_grants, authorized_response_types):
""" save client to db """
sql = zoe_lib.state.SQLManager(get_conf())
sql.save_client(identifier,
secret,
role,
redirect_uris,
authorized_grants,
authorized_response_types)
return True
def fetch_by_client_id(self, client_id):
""" get client by clientid """
sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id)
client_data_grants = client_data["authorized_grants"].split(':')
if client_data is None:
raise ClientNotFoundError
return Client(identifier=client_data["identifier"],
secret=client_data["secret"],
redirect_uris=client_data["redirect_uris"],
authorized_grants=client_data_grants,
authorized_response_types=client_data["authorized_response_types"])
def get_role_by_client_id(self, client_id):
""" get client role by clientid """
sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id)
if client_data is None:
raise ClientNotFoundError
return client_data["role"]
# Copyright (c) 2016, Quang-Nhat HOANG-XUAN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Token generator for oauth2."""
import hashlib
import os
import uuid
class TokenGenerator(object):
"""
Base class of every token generator.
"""
def __init__(self):
"""
Create a new instance of a token generator.
"""
self.expires_in = {}
self.refresh_expires_in = 3600
def create_access_token_data(self, grant_type):
"""
Create data needed by an access token.
:param grant_type:
:type grant_type: str
:return: A ``dict`` containing he ``access_token`` and the
``token_type``. If the value of ``TokenGenerator.expires_in``
is larger than 0, a ``refresh_token`` will be generated too.
:rtype: dict
"""
if grant_type == 'password':
self.expires_in['password'] = 36000
result = {"access_token": self.generate(), "token_type": "Bearer"}
if self.expires_in.get(grant_type, 0) > 0:
result["refresh_token"] = self.generate()
result["expires_in"] = self.expires_in[grant_type]
return result
def generate(self):
"""
Implemented by generators extending this base class.
:raises NotImplementedError:
"""
raise NotImplementedError
class URandomTokenGenerator(TokenGenerator):
"""
Create a token using ``os.urandom()``.
"""
def __init__(self, length=40):
self.token_length = length
TokenGenerator.__init__(self)
def generate(self):
"""
:return: A new token
:rtype: str
"""
random_data = os.urandom(100)
hash_gen = hashlib.new("sha512")
hash_gen.update(random_data)
return hash_gen.hexdigest()[:self.token_length]
class Uuid4(TokenGenerator):
"""
Generate a token using uuid4.
"""
def generate(self):
"""
:return: A new token
:rtype: str
"""
return str(uuid.uuid4())
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 4 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......@@ -47,16 +47,42 @@ def check_schema_version(cur, deployment_name):
else:
if row[0] == SQL_SCHEMA_VERSION:
return True
elif row[0] < SQL_SCHEMA_VERSION:
raise zoe_api.exceptions.ZoeException('The database schema needs to be upgraded, run the "db_upgrade.py" script.')
else:
raise zoe_api.exceptions.ZoeException('SQL database schema version mismatch: need {}, found {}'.format(SQL_SCHEMA_VERSION, row[0]))
raise zoe_api.exceptions.ZoeException('SQL database schema version mismatch: need {}, found {}, cannot downgrade'.format(SQL_SCHEMA_VERSION, row[0]))
def create_tables(cur):
"""Create the Zoe database tables."""
# Quotas
cur.execute('''CREATE TABLE quotas (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
concurrent_executions INT NOT NULL,
memory BIGINT NOT NULL,
cores INT NOT NULL
)''')
cur.execute('''INSERT INTO quotas (id, name, concurrent_executions, memory, cores) VALUES (DEFAULT, 'default', 5, 34359738368, 20)''')
# Users
cur.execute('''CREATE TABLE users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL,
role TEXT NOT NULL,
email TEXT,
priority SMALLINT NOT NULL DEFAULT 0,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
quota_id INT REFERENCES quotas
)''')
cur.execute('CREATE UNIQUE INDEX users_username_uindex ON users (username)')
# Executions
cur.execute('''CREATE TABLE execution (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
user_id TEXT NOT NULL,
user_id INT REFERENCES users,
description JSON NOT NULL,
status TEXT NOT NULL,
execution_manager_id TEXT NULL,
......@@ -65,6 +91,9 @@ def create_tables(cur):
time_end TIMESTAMP NULL,
error_message TEXT NULL
)''')
cur.execute('''ALTER TABLE execution ALTER COLUMN id TYPE BIGINT''')
# Services
cur.execute('''CREATE TABLE service (
id SERIAL PRIMARY KEY,
status TEXT NOT NULL,
......@@ -78,6 +107,9 @@ def create_tables(cur):
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
cur.execute('''ALTER TABLE service ALTER COLUMN id TYPE BIGINT''')
# Ports
cur.execute('''CREATE TABLE port (
id SERIAL PRIMARY KEY,
service_id INT REFERENCES service ON DELETE CASCADE,
......@@ -86,26 +118,6 @@ def create_tables(cur):
external_port INT NULL,
description JSON NOT NULL
)''')
#Create oauth_client and oauth_token tables for oAuth2
cur.execute('''CREATE TABLE oauth_client (
identifier TEXT PRIMARY KEY,
secret TEXT,
role TEXT,
redirect_uris TEXT,
authorized_grants TEXT,
authorized_response_types TEXT
)''')
cur.execute('''CREATE TABLE oauth_token (
client_id TEXT PRIMARY KEY,
grant_type TEXT,
token TEXT,
data TEXT,
expires_at TIMESTAMP,
refresh_token TEXT,
refresh_token_expires_at TIMESTAMP,
scopes TEXT,
user_id TEXT
)''')
def init(force=False):
......@@ -126,8 +138,11 @@ def init(force=False):
cur.execute("DELETE FROM public.versions WHERE deployment = %s", (get_conf().deployment_name,))
cur.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(get_conf().deployment_name))
if not check_schema_version(cur, get_conf().deployment_name):
create_tables(cur)
try:
if not check_schema_version(cur, get_conf().deployment_name):
create_tables(cur)
except zoe_api.exceptions.ZoeException as e:
print(e)
conn.commit()
cur.close()
......
......@@ -61,9 +61,11 @@ def zoe_web_main() -> int:
'static_path': os.path.join(os.path.dirname(__file__), "web", "static"),
'template_path': os.path.join(os.path.dirname(__file__), "web", "templates"),
'cookie_secret': config.get_conf().cookie_secret,
'login_url': '/login', # used only by the web interface
'debug': args.debug
}
app = Application(zoe_api.web.web_init(api_endpoint) + zoe_api.rest_api.api_init(api_endpoint), **app_settings)
app = Application(zoe_api.web.web_init() + zoe_api.rest_api.api_init(api_endpoint), **app_settings)
app.api_endpoint = api_endpoint
JinjaApp.init_app(app)
log.info("Starting HTTP server...")
......
......@@ -21,13 +21,12 @@ import tornado.web
from zoe_api.rest_api.execution import ExecutionAPI, ExecutionCollectionAPI, ExecutionDeleteAPI, ExecutionEndpointsAPI
from zoe_api.rest_api.info import InfoAPI
from zoe_api.rest_api.userinfo import UserInfoAPI
from zoe_api.rest_api.service import ServiceAPI, ServiceLogsAPI
from zoe_api.rest_api.discovery import DiscoveryAPI
from zoe_api.rest_api.statistics import SchedulerStatsAPI
from zoe_api.rest_api.oauth import OAuthGetAPI, OAuthRevokeAPI
from zoe_api.rest_api.login import LoginAPI
from zoe_api.rest_api.user import LoginAPI, UserAPI, UserCollectionAPI
from zoe_api.rest_api.validation import ZAppValidateAPI
from zoe_api.rest_api.quota import QuotaAPI, QuotaCollectionAPI
from zoe_lib.version import ZOE_API_VERSION
......@@ -43,11 +42,15 @@ def api_init(api_endpoint) -> List[tornado.web.URLSpec]:
api_routes = [
tornado.web.url(API_PATH + r'/info', InfoAPI, route_args),
tornado.web.url(API_PATH + r'/login', LoginAPI, route_args),
tornado.web.url(API_PATH + r'/userinfo', UserInfoAPI, route_args),
tornado.web.url(API_PATH + r'/zapp_validate', ZAppValidateAPI, route_args),
tornado.web.url(API_PATH + r'/user', UserCollectionAPI, route_args),
tornado.web.url(API_PATH + r'/user/([a-zA-Z0-9]+)', UserAPI, route_args),
tornado.web.url(API_PATH + r'/quota', QuotaCollectionAPI, route_args),
tornado.web.url(API_PATH + r'/quota/([0-9]+)', QuotaAPI, route_args),
tornado.web.url(API_PATH + r'/zapp/validate', ZAppValidateAPI, route_args),
tornado.web.url(API_PATH + r'/execution/([0-9]+)', ExecutionAPI, route_args),
tornado.web.url(API_PATH + r'/execution/delete/([0-9]+)', ExecutionDeleteAPI, route_args),
tornado.web.url(API_PATH + r'/execution/endpoints/([0-9]+)', ExecutionEndpointsAPI, route_args),
tornado.web.url(API_PATH + r'/execution', ExecutionCollectionAPI, route_args),
......@@ -56,10 +59,7 @@ def api_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(API_PATH + r'/discovery/by_group/([0-9]+)/([a-z0-9A-Z\-]+)', DiscoveryAPI, route_args),
tornado.web.url(API_PATH + r'/statistics/scheduler', SchedulerStatsAPI, route_args),
tornado.web.url(API_PATH + r'/oauth/token', OAuthGetAPI, route_args),
tornado.web.url(API_PATH + r'/oauth/revoke/([a-z0-9A-Z\-]+)', OAuthRevokeAPI, route_args)
tornado.web.url(API_PATH + r'/statistics/scheduler', SchedulerStatsAPI, route_args)
]
return api_routes
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Customized Tornado request handler for the Zoe REST API."""
import logging
from typing import Union
from tornado.web import RequestHandler
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.utils import try_basic_auth
import zoe_api.exceptions
from zoe_lib.state import User
log = logging.getLogger(__name__)
class BaseRequestHandler(RequestHandler):
"""Customized Tornado request handler for the Zoe REST API."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = self.application.api_endpoint # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
if self.request.headers.get('Origin') is None:
self.set_header("Access-Control-Allow-Origin", "*")
else:
self.set_header("Access-Control-Allow-Origin", self.request.headers.get('Origin'))
self.set_header("Access-Control-Allow-Credentials", "true")
self.set_header("Access-Control-Allow-Headers", "x-requested-with, Content-Type, origin, authorization, accept, client-security-token")
self.set_header("Access-Control-Allow-Methods", "OPTIONS, GET, DELETE")
self.set_header("Access-Control-Max-Age", "1000")
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
def _uid_to_user(self, uid: str) -> Union[User, None]:
try:
return self.application.api_endpoint.user_identify(uid)
except zoe_api.exceptions.ZoeNotFoundException:
return None
def get_current_user(self) -> Union[User, None]:
"""Authenticate each request."""
if self.get_secure_cookie('zoe_web_user'):
uid = self.get_secure_cookie('zoe').decode('uft-8')
log.info('Authentication done using cookie')
return self._uid_to_user(uid)
auth_header = self.request.headers.get('Authorization')
if auth_header is None or not auth_header.startswith('Basic '):
return None
try:
uid, role = try_basic_auth(self)
except zoe_api.exceptions.ZoeAuthException:
return None
log.debug('Authentication done using auth-mechanism')
user = self._uid_to_user(uid)
if user is None:
self.api_endpoint.user_new(uid, role)
log.info('New user created: {}'.format(uid))
user = self._uid_to_user(uid)
if not user.enabled:
return None
if user.role != role:
log.info('Role for user {} updated, set to {}'.format(user.username, user.role))
user.set_role(role)
return user
......@@ -15,28 +15,13 @@
"""The Discovery API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.utils import catch_exceptions
from zoe_api.rest_api.custom_request_handler import BaseRequestHandler
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.utils import catch_exceptions, manage_cors_headers
class DiscoveryAPI(RequestHandler):
class DiscoveryAPI(BaseRequestHandler):
"""The Discovery API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self, execution_id: int, service_group: str):
"""HTTP GET method."""
......@@ -52,7 +37,3 @@ class DiscoveryAPI(RequestHandler):
}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -15,114 +15,61 @@
"""The Execution API endpoints."""
from tornado.web import RequestHandler
import tornado.escape
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.rest_api.utils import catch_exceptions, needs_auth
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.custom_request_handler import BaseRequestHandler
class ExecutionAPI(RequestHandler):
class ExecutionAPI(BaseRequestHandler):
"""The Execution API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
@needs_auth
def get(self, execution_id):
"""GET a single execution by its ID."""
uid, role = get_auth(self)
e = self.api_endpoint.execution_by_id(uid, role, execution_id)
e = self.api_endpoint.execution_by_id(self.current_user, execution_id)
self.write(e.serialize())
@catch_exceptions
@needs_auth
def delete(self, execution_id: int):
"""
Terminate an execution.
:param execution_id: the execution to be terminated
"""
uid, role = get_auth(self)
success, message = self.api_endpoint.execution_terminate(uid, role, execution_id)
success, message = self.api_endpoint.execution_terminate(self.current_user, execution_id)
if not success:
raise zoe_api.exceptions.ZoeRestAPIException(message, 400)
self.set_status(204)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionDeleteAPI(RequestHandler):
class ExecutionDeleteAPI(BaseRequestHandler):
"""The ExecutionDelete API endpoints."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
@needs_auth
def delete(self, execution_id: int):
"""
Delete an execution.
:param execution_id: the execution to be deleted
"""
uid, role = get_auth(self)
success, message = self.api_endpoint.execution_delete(uid, role, execution_id)
success, message = self.api_endpoint.execution_delete(self.current_user, execution_id)
if not success:
raise zoe_api.exceptions.ZoeRestAPIException(message, 400)
self.set_status(204)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionCollectionAPI(RequestHandler):
class ExecutionCollectionAPI(BaseRequestHandler):
"""The Execution Collection API endpoints."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
@needs_auth
def get(self):
"""
Returns a list of all active executions.
......@@ -146,8 +93,6 @@ class ExecutionCollectionAPI(RequestHandler):
:return:
"""
uid, role = get_auth(self)
filt_dict = {}
filters = [
......@@ -169,19 +114,18 @@ class ExecutionCollectionAPI(RequestHandler):
else:
filt_dict[filt[0]] = filt[1](self.request.arguments[filt[0]][0])
execs = self.api_endpoint.execution_list(uid, role, **filt_dict)
execs = self.api_endpoint.execution_list(self.current_user, **filt_dict)
self.write(dict([(e.id, e.serialize()) for e in execs]))
@catch_exceptions
@needs_auth
def post(self):
"""
Starts an execution, given an application description. Takes a JSON object.
:return: the new execution_id
"""
uid, role = get_auth(self)
try:
data = tornado.escape.json_decode(self.request.body)
except ValueError:
......@@ -190,47 +134,24 @@ class ExecutionCollectionAPI(RequestHandler):
application_description = data['application']
exec_name = data['name']
new_id = self.api_endpoint.execution_start(uid, role, exec_name, application_description)
new_id = self.api_endpoint.execution_start(self.current_user, exec_name, application_description)
self.set_status(201)
self.write({'execution_id': new_id})
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionEndpointsAPI(RequestHandler):
class ExecutionEndpointsAPI(BaseRequestHandler):
"""The ExecutionEndpoints API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
@needs_auth
def get(self, execution_id: int):
"""
Get a list of execution endpoints.
:param execution_id: the execution to be deleted
"""
uid, role = get_auth(self)
execution = self.api_endpoint.execution_by_id(uid, role, execution_id)
services_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
execution = self.api_endpoint.execution_by_id(self.current_user, execution_id)
services_, endpoints = self.api_endpoint.execution_endpoints(self.current_user, execution)
self.write({'endpoints': endpoints})
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -15,31 +15,16 @@
"""The Info API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.utils import catch_exceptions, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.utils import catch_exceptions
from zoe_lib.config import get_conf
from zoe_lib.version import ZOE_API_VERSION, ZOE_APPLICATION_FORMAT_VERSION, ZOE_VERSION
from zoe_api.rest_api.custom_request_handler import BaseRequestHandler
class InfoAPI(RequestHandler):
class InfoAPI(BaseRequestHandler):
"""The Info API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self):
"""HTTP GET method."""
......@@ -51,7 +36,3 @@ class InfoAPI(RequestHandler):
}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The oAuth2 API endpoints."""
import logging
import json
import psycopg2
from tornado.web import RequestHandler
import oauth2.grant
from zoe_api.rest_api.utils import catch_exceptions, get_auth
from zoe_api.rest_api.oauth_utils import auth_controller, client_store, token_store
from zoe_api.rest_api.utils import manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
log = logging.getLogger(__name__)
"""
Example of using:
*To request a new token of type:
Input: curl -u 'admin:admin' http://localhost:5001/api/0.6/oauth/token -X POST -H 'Content-Type: application/json' -d '{"grant_type": "password"}'
Output: {"token_type": "Bearer", "access_token": "3ddbe9ba-6a21-4e4d-993b-70556390c5d3", "refresh_token": "9bab190f-e211-42aa-917e-20ce987e355e", "expires_in": 36000}
*To refresh a token
Input: curl -H 'Authorization: Bearer 9bab190f-e211-42aa-917e-20ce987e355e' http://localhost:5001/api/0.6/oauth/token -X POST -H 'Content-Type: application/json' -d '{"grant_type": "refresh_token"}'
Output: {"token_type": "Bearer", "access_token": "378f8d5f-2eb5-4181-b632-ad23c4534d32", "expires_in": 36000}
*To revoke a token, the passed token could be the access token or refresh token
curl -u 'admin:admin' -X DELETE http://localhost:5001/api/0.6/oauth/revoke/378f8d5f-2eb5-4181-b632-ad23c4534d32
*To authenticate with other rest api services, using a header with: "Authorization: Bearer access_token"
curl -H 'Authorization: Bearer 378f8d5f-2eb5-4181-b632-ad23c4534d32' http://localhost:5001/api/0.6/execution
"""
class OAuthGetAPI(RequestHandler):
"""The OAuthGetAPI endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.auth_controller = auth_controller
self.client_store = client_store
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def post(self):
"""REQUEST/REFRESH token"""
uid, role = get_auth(self)
grant_type = oauth2.grant.RefreshToken.grant_type + ':' + oauth2.grant.ResourceOwnerGrant.grant_type
try:
self.client_store.save_client(uid, '', role, '', grant_type, '')
except psycopg2.IntegrityError:
log.info('User is already had')
response = self._dispatch_request(uid)
self._map_response(response)
def _dispatch_request(self, uid):
request = self.request
params = json.loads(request.body.decode())
if params['grant_type'] == 'refresh_token':
auth_header = self.request.headers.get('Authorization')
refresh_token = auth_header[7:]
params['refresh_token'] = refresh_token
params['password'] = ''
params['username'] = ''
params['client_secret'] = ''
params['scope'] = ''
params['client_id'] = uid
request.post_param = lambda key: params[key]
return self.auth_controller.dispatch(request, environ={})
def _map_response(self, response):
for name, value in list(response.headers.items()):
self.set_header(name, value)
self.set_status(response.status_code)
if response.status_code == 200:
log.info("New token granted...")
self.write(response.body)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class OAuthRevokeAPI(RequestHandler):
"""The OAuthRevokeAPI endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.auth_controller = auth_controller
self.token_store = token_store
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def delete(self, token):
"""DELETE token (logout)"""
get_auth(self)
res = self.token_store.delete_refresh_token(token)
if res == 0:
ret = {'ret' :'No token found in database.'}
else:
ret = {'res': 'Revoked token.'}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Quota API endpoint."""
import tornado.escape
from zoe_api.rest_api.utils import catch_exceptions, needs_auth
import zoe_api.exceptions
from zoe_api.rest_api.custom_request_handler import BaseRequestHandler
class QuotaAPI(BaseRequestHandler):
"""The Quota API endpoint."""
@catch_exceptions
@needs_auth
def get(self, quota_id):
"""Get one quota object."""
quota = self.api_endpoint.quota_by_id(self.current_user, quota_id)
self.write(quota.serialize())
@catch_exceptions
@needs_auth
def put(self, quota_id):
"""Update a quota object."""
try:
data = tornado.escape.json_decode(self.request.body)
except ValueError:
raise zoe_api.exceptions.ZoeRestAPIException('Error decoding JSON data')
self.api_endpoint.quota_update(self.current_user, quota_id, data)
self.set_status(204)
@catch_exceptions
@needs_auth
def delete(self, quota_id):
"""Delete a quota."""
self.api_endpoint.quota_delete(self.current_user, quota_id)
self.set_status(204)
class QuotaCollectionAPI(BaseRequestHandler):
"""The Quota API endpoint."""
@catch_exceptions
@needs_auth
def get(self):
"""Retrieve a possibly filtered list of quotas."""
filt_dict = {}
quotas = self.api_endpoint.quota_list(self.current_user, **filt_dict)
self.write(dict([(q.id, q.serialize()) for q in quotas]))
@catch_exceptions
@needs_auth
def post(self):
"""
Creates a new quota. Takes a JSON object.
:return: the new quota_id
"""
try:
data = tornado.escape.json_decode(self.request.body)
except ValueError:
raise zoe_api.exceptions.ZoeRestAPIException('Error decoding JSON data')
name = data['name']
cc_exec = int(data['cc_exec'])
memory = int(data['memory'])
cores = int(data['cores'])
new_id = self.api_endpoint.quota_new(self.current_user, name, cc_exec, memory, cores)
self.set_status(201)
self.write({'quota_id': new_id})
......@@ -18,79 +18,47 @@
from concurrent.futures import ThreadPoolExecutor
import logging
from tornado.web import RequestHandler
import tornado.gen
import tornado.iostream
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.utils import catch_exceptions, needs_auth
from zoe_api.rest_api.custom_request_handler import BaseRequestHandler
log = logging.getLogger(__name__)
THREAD_POOL = ThreadPoolExecutor(20)
class ServiceAPI(RequestHandler):
class ServiceAPI(BaseRequestHandler):
"""The Service API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, service_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
@needs_auth
def get(self, service_id):
"""HTTP GET method."""
uid, role = get_auth(self)
service = self.api_endpoint.service_by_id(uid, role, service_id)
service = self.api_endpoint.service_by_id(self.current_user, service_id)
self.write(service.serialize())
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ServiceLogsAPI(RequestHandler):
class ServiceLogsAPI(BaseRequestHandler):
"""The Service logs API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
super().initialize(**kwargs)
self.connection_closed = False
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, service_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
def on_connection_close(self):
"""Tornado callback for clients closing the connection."""
self.connection_closed = True
@catch_exceptions
@needs_auth
@tornado.gen.coroutine
def get(self, service_id):
"""HTTP GET method."""
uid, role =