Commit fa0ceb60 authored by Daniele Venzano's avatar Daniele Venzano

Re-implement authentication via the new DB tables, unify API and web auth

parent de9d2175
......@@ -17,7 +17,6 @@
import logging
import os
from typing import Mapping
import zoe_api.exceptions
import zoe_api.master_api
......@@ -40,29 +39,28 @@ class APIEndpoint:
self.master = master_api
self.sql = sql_manager
def execution_by_id(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, execution_id: int) -> zoe_lib.state.Execution:
def execution_by_id(self, user: zoe_lib.state.User, execution_id: int) -> zoe_lib.state.Execution:
"""Lookup an execution by its ID."""
e = self.sql.executions.select(id=execution_id, only_one=True)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
assert isinstance(e, zoe_lib.state.Execution)
if e.user_id != uid.id and not role.can_operate_others:
if e.user_id != user.id and not user.role.can_operate_others:
raise zoe_api.exceptions.ZoeAuthException()
return e
def execution_list(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, **filters: Mapping[str, str]):
def execution_list(self, user: zoe_lib.state.User, **filters):
"""Generate a optionally filtered list of executions."""
if not role.can_operate_others:
filters['user_id'] = uid.id
if not user.role.can_operate_others:
filters['user_id'] = user.id
execs = self.sql.executions.select(**filters)
return execs
def execution_count(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, **filters: Mapping[str, str]):
def execution_count(self, user: zoe_lib.state.User, **filters):
"""Count the number of executions optionally filtered."""
if not role.can_operate_others:
filters['user_id'] = uid.id
execs = self.sql.executions.count(**filters)
return execs
if not user.role.can_operate_others:
filters['user_id'] = user.id
return self.sql.executions.count(**filters)
def zapp_validate(self, application_description):
"""Validates the passed ZApp description against the supported schema."""
......@@ -85,40 +83,40 @@ class APIEndpoint:
# TODO: implement core and memory quotas
def execution_start(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, exec_name, application_description): # pylint: disable=unused-argument
def execution_start(self, user: zoe_lib.state.User, exec_name, application_description):
"""Start an execution."""
try:
zoe_lib.applications.app_validate(application_description)
except zoe_lib.exceptions.InvalidApplicationDescription as e:
raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)
self._check_quota(uid, application_description)
self._check_quota(user, application_description)
new_id = self.sql.executions.insert(exec_name, uid.id, application_description)
new_id = self.sql.executions.insert(exec_name, user.id, application_description)
success, message = self.master.execution_start(new_id)
if not success:
raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
return new_id
def execution_terminate(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, exec_id: int):
def execution_terminate(self, user: zoe_lib.state.User, exec_id: int):
"""Terminate an execution."""
e = self.sql.executions.select(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.Execution)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
if e.user_id != uid.id and not role.can_operate_others:
raise zoe_api.exceptions.ZoeAuthException()
if e.user_id != user.id and not user.role.can_operate_others:
raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
if e.is_active:
return self.master.execution_terminate(exec_id)
else:
raise zoe_api.exceptions.ZoeException('Execution is not running')
def execution_delete(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, exec_id: int):
def execution_delete(self, user: zoe_lib.state.User, exec_id: int):
"""Delete an execution."""
if not role.can_delete_executions:
if not user.role.can_delete_executions:
raise zoe_api.exceptions.ZoeAuthException()
e = self.sql.executions.select(id=exec_id, only_one=True)
......@@ -126,8 +124,8 @@ class APIEndpoint:
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
if e.user_id != uid.id and not role.can_operate_others:
raise zoe_api.exceptions.ZoeAuthException()
if e.user_id != user.id and not user.role.can_operate_others:
raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
if e.is_active:
raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
......@@ -139,29 +137,29 @@ class APIEndpoint:
else:
raise zoe_api.exceptions.ZoeException(message)
def service_by_id(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, service_id: int) -> zoe_lib.state.Service:
def service_by_id(self, user: zoe_lib.state.User, service_id: int) -> zoe_lib.state.Service:
"""Lookup a service by its ID."""
service = self.sql.services.select(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
if service.user_id != uid.id and not role.can_operate_others:
if service.user_id != user.id and not user.role.can_operate_others:
raise zoe_api.exceptions.ZoeAuthException()
return service
def service_list(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, **filters):
def service_list(self, user: zoe_lib.state.User, **filters):
"""Generate a optionally filtered list of services."""
if not role.can_operate_others:
filters['user_id'] = uid.id
if not user.role.can_operate_others:
filters['user_id'] = user.id
return self.sql.services.select(**filters)
def service_logs(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, service_id):
def service_logs(self, user: zoe_lib.state.User, service_id):
"""Retrieve the logs for the given service.
If stream is True, a file object is returned, otherwise the log contents as a str object.
"""
service = self.sql.services.select(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such service')
if service.user_id != uid.id and not role.can_operate_others:
if service.user_id != user.id and not user.role.can_operate_others:
raise zoe_api.exceptions.ZoeAuthException()
path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
......@@ -169,7 +167,7 @@ class APIEndpoint:
raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
return open(path, encoding='utf-8')
def statistics_scheduler(self, uid_, role_):
def statistics_scheduler(self):
"""Retrieve statistics about the scheduler."""
success, message = self.master.scheduler_statistics()
if success:
......@@ -181,12 +179,12 @@ class APIEndpoint:
else:
raise zoe_api.exceptions.ZoeException(message=message)
def execution_endpoints(self, uid: zoe_lib.state.User, role: zoe_lib.state.Role, execution: zoe_lib.state.Execution):
def execution_endpoints(self, user: zoe_lib.state.User, execution: zoe_lib.state.Execution):
"""Return a list of the services and public endpoints available for a certain execution."""
services_info = []
endpoints = []
for service in execution.services:
services_info.append(self.service_by_id(uid, role, service.id))
services_info.append(self.service_by_id(user, service.id))
for port in service.description['ports']:
port_key = str(port['port_number']) + "/" + port['protocol']
backend_port = self.sql.ports.select(only_one=True, service_id=service.id, internal_name=port_key)
......@@ -195,3 +193,7 @@ class APIEndpoint:
endpoints.append((port['name'], endpoint))
return services_info, endpoints
def user_by_name(self, username):
"""Finds a user in the database looking it up by its username."""
return self.sql.user.select(only_one=True, **{'username': username})
# Copyright (c) 2016, Daniele Venzano
# Copyright (c) 2018, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -15,10 +15,35 @@
"""Base authenticator class."""
import logging
from typing import Union
from zoe_api.auth.file import PlainTextAuthenticator
from zoe_api.auth.ldap import LDAPAuthenticator
from zoe_lib.state import SQLManager, User
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class BaseAuthenticator:
"""Base authenticator class."""
def auth(self, username, password):
"""The methods that needs to be overridden by implementations."""
raise NotImplementedError
def __init__(self):
self.state = SQLManager(get_conf())
def full_auth(self, username, password) -> Union[None, User]:
"""This method verifies the username and the password against one of the external auth sources."""
user = self.state.user.select(only_one=True, **{"username": username})
if not user.enabled:
return None
if user.auth_source == "textfile" and PlainTextAuthenticator(get_conf().auth_file).auth(username, password):
return user
elif user.auth_source == "ldap" and LDAPAuthenticator(get_conf(), sasl=False).auth(username, password):
return user
elif user.auth_source == "ldap+sasl" and LDAPAuthenticator(get_conf(), sasl=True).auth(username, password):
return user
else:
log.error('Unknown auth source {} for user {}, cannot authenticate'.format(user.auth_source, user.username))
return None
......@@ -26,10 +26,10 @@ from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class PlainTextAuthenticator(zoe_api.auth.base.BaseAuthenticator):
class PlainTextAuthenticator:
"""A basic plain text file authenticator."""
def __init__(self):
self.passwd_file = get_conf().auth_file
def __init__(self, filename):
self.passwd_file = filename
if not os.access(self.passwd_file, os.R_OK):
raise zoe_api.exceptions.ZoeNotFoundException('Password file not found at: {}'.format(self.passwd_file))
......
......@@ -34,12 +34,12 @@ from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class LDAPAuthenticator(zoe_api.auth.base.BaseAuthenticator):
class LDAPAuthenticator:
"""A simple LDAP authenticator."""
def __init__(self, sasl):
self.connection = ldap.initialize(get_conf().ldap_server_uri)
self.base_dn = get_conf().ldap_base_dn
def __init__(self, conf, sasl):
self.connection = ldap.initialize(conf.ldap_server_uri)
self.base_dn = conf.ldap_base_dn
self.sasl = sasl
self.connection.protocol_version = ldap.VERSION3
if self.sasl:
......
# Copyright (c) 2018, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Custom request handler for tornado, subclassed to implement authentication."""
import base64
import logging
import tornado.web
import tornado.escape
from zoe_api.api_endpoint import APIEndpoint
from zoe_api.auth.base import BaseAuthenticator
from zoe_api.exceptions import ZoeAuthException
log = logging.getLogger(__name__)
class ZoeRequestHandler(tornado.web.RequestHandler):
"""Customized request handler."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize()
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def get_current_user(self):
"""Get the user making the request from one of several possible locations."""
auth_header = self.request.headers.get('Authorization')
if self.get_secure_cookie('zoe'): # cookie auth
username = tornado.escape.xhtml_escape(self.get_secure_cookie('zoe'))
user = self.api_endpoint.user_by_name(username)
method = "cookie"
elif auth_header is not None and auth_header.startswith('Basic '): # basic auth
auth_decoded = base64.decodebytes(bytes(auth_header[6:], 'ascii')).decode('utf-8')
username, password = auth_decoded.split(':', 2)
user = BaseAuthenticator().full_auth(username, password)
method = "basic_auth"
else:
method = None
user = None
if user is None:
raise ZoeAuthException('Invalid username or password')
if not user.enabled:
raise ZoeAuthException('User has been disabled by the administrator')
log.debug('Authentication done using {} (user {} from {} for {})'.format(method, user.username, self.request.remote_ip, self.request.path))
return user
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -29,7 +29,7 @@ import zoe_api.rest_api
import zoe_api.master_api
import zoe_api.web
import zoe_api.auth.ldap
from zoe_api.web.custom_request_handler import JinjaApp
from zoe_api.web.request_handler import JinjaApp
log = logging.getLogger("zoe_api")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
......@@ -67,6 +67,7 @@ def zoe_web_main(test_conf=None) -> int:
'static_path': os.path.join(os.path.dirname(__file__), "web", "static"),
'template_path': os.path.join(os.path.dirname(__file__), "web", "templates"),
'cookie_secret': config.get_conf().cookie_secret,
'login_url': '/login',
'debug': args.debug
}
app = Application(zoe_api.web.web_init(api_endpoint) + zoe_api.rest_api.api_init(api_endpoint), **app_settings)
......
......@@ -15,36 +15,22 @@
"""The Discovery API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.request_handler import ZoeAPIRequestHandler
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.utils import catch_exceptions, manage_cors_headers
class DiscoveryAPI(RequestHandler):
class DiscoveryAPI(ZoeAPIRequestHandler):
"""The Discovery API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self, execution_id: int, service_group: str):
"""HTTP GET method."""
self.api_endpoint.execution_by_id(0, 'admin', execution_id)
if self.current_user is None:
return
self.api_endpoint.execution_by_id(self.current_user, execution_id)
if service_group != 'all':
services = self.api_endpoint.service_list(0, 'admin', service_group=service_group, execution_id=execution_id)
services = self.api_endpoint.service_list(self.current_user, service_group=service_group, execution_id=execution_id)
else:
services = self.api_endpoint.service_list(0, 'admin', execution_id=execution_id)
services = self.api_endpoint.service_list(self.current_user, execution_id=execution_id)
ret = {
'service_type': service_group,
'execution_id': execution_id,
......@@ -52,7 +38,3 @@ class DiscoveryAPI(RequestHandler):
}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -15,114 +15,62 @@
"""The Execution API endpoints."""
from tornado.web import RequestHandler
import tornado.escape
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.rest_api.request_handler import ZoeAPIRequestHandler
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
class ExecutionAPI(RequestHandler):
class ExecutionAPI(ZoeAPIRequestHandler):
"""The Execution API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self, execution_id):
"""GET a single execution by its ID."""
uid, role = get_auth(self)
if self.current_user is None:
return
e = self.api_endpoint.execution_by_id(uid, role, execution_id)
e = self.api_endpoint.execution_by_id(self.current_user, execution_id)
self.write(e.serialize())
@catch_exceptions
def delete(self, execution_id: int):
"""
Terminate an execution.
:param execution_id: the execution to be terminated
"""
uid, role = get_auth(self)
if self.current_user is None:
return
success, message = self.api_endpoint.execution_terminate(uid, role, execution_id)
success, message = self.api_endpoint.execution_terminate(self.current_user, execution_id)
if not success:
raise zoe_api.exceptions.ZoeRestAPIException(message, 400)
self.set_status(204)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionDeleteAPI(RequestHandler):
class ExecutionDeleteAPI(ZoeAPIRequestHandler):
"""The ExecutionDelete API endpoints."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def delete(self, execution_id: int):
"""
Delete an execution.
:param execution_id: the execution to be deleted
"""
uid, role = get_auth(self)
if self.current_user is None:
return
success, message = self.api_endpoint.execution_delete(uid, role, execution_id)
success, message = self.api_endpoint.execution_delete(self.current_user, execution_id)
if not success:
raise zoe_api.exceptions.ZoeRestAPIException(message, 400)
self.set_status(204)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionCollectionAPI(RequestHandler):
class ExecutionCollectionAPI(ZoeAPIRequestHandler):
"""The Execution Collection API endpoints."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self):
"""
Returns a list of all active executions.
......@@ -146,7 +94,8 @@ class ExecutionCollectionAPI(RequestHandler):
:return:
"""
uid, role = get_auth(self)
if self.current_user is None:
return
filt_dict = {}
......@@ -169,18 +118,18 @@ class ExecutionCollectionAPI(RequestHandler):
else:
filt_dict[filt[0]] = filt[1](self.request.arguments[filt[0]][0])
execs = self.api_endpoint.execution_list(uid, role, **filt_dict)
execs = self.api_endpoint.execution_list(self.current_user, **filt_dict)
self.write(dict([(e.id, e.serialize()) for e in execs]))
@catch_exceptions
def post(self):
"""
Starts an execution, given an application description. Takes a JSON object.
:return: the new execution_id
"""
uid, role = get_auth(self)
if self.current_user is None:
return
try:
data = tornado.escape.json_decode(self.request.body)
......@@ -190,47 +139,25 @@ class ExecutionCollectionAPI(RequestHandler):
application_description = data['application']
exec_name = data['name']
new_id = self.api_endpoint.execution_start(uid, role, exec_name, application_description)
new_id = self.api_endpoint.execution_start(self.current_user, exec_name, application_description)
self.set_status(201)
self.write({'execution_id': new_id})
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionEndpointsAPI(RequestHandler):
class ExecutionEndpointsAPI(ZoeAPIRequestHandler):
"""The ExecutionEndpoints API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self, execution_id: int):
"""
Get a list of execution endpoints.
:param execution_id: the execution to be deleted
"""
uid, role = get_auth(self)
if self.current_user is None:
return
execution = self.api_endpoint.execution_by_id(uid, role, execution_id)
services_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
execution = self.api_endpoint.execution_by_id(self.current_user, execution_id)
services_, endpoints = self.api_endpoint.execution_endpoints(self.current_user, execution)
self.write({'endpoints': endpoints})
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -15,32 +15,16 @@
"""The Info API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.utils import catch_exceptions, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.request_handler import ZoeAPIRequestHandler
from zoe_lib.config import get_conf
from zoe_lib.version import ZOE_API_VERSION, ZOE_APPLICATION_FORMAT_VERSION, ZOE_VERSION
class InfoAPI(RequestHandler):
class InfoAPI(ZoeAPIRequestHandler):
"""The Info API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self):
"""HTTP GET method."""
ret = {
......@@ -51,7 +35,3 @@ class InfoAPI(RequestHandler):
}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -15,44 +15,23 @@
"""The Info API endpoint."""
from tornado.web import RequestHandler
from zoe_api.rest_api.utils import get_auth, catch_exceptions, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.rest_api.request_handler import ZoeAPIRequestHandler
class LoginAPI(RequestHandler):
class LoginAPI(ZoeAPIRequestHandler):
"""The Login API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self):
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self):
"""HTTP GET method."""
uid, role = get_auth(self)
if self.current_user is None:
return
cookie_val = uid + '.' + role
cookie_val = self.current_user.username
self.set_secure_cookie('zoe', cookie_val)
ret = {
'uid': uid,
'role': role
'user': self.current_user,
}
self.write(ret)
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
# Copyright (c) 2018, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.