api_endpoint.py 8.81 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The real API, exposed as web pages or REST API."""

import logging
19
import os
20

21 22
import zoe_api.exceptions
import zoe_api.master_api
23 24
import zoe_lib.applications
import zoe_lib.exceptions
25 26
import zoe_lib.state
from zoe_lib.config import get_conf
27 28 29

log = logging.getLogger(__name__)

30 31
GUEST_QUOTA_MAX_EXECUTIONS = 1

32 33 34 35 36 37 38 39 40 41

class APIEndpoint:
    """
    The APIEndpoint class.

    :type master: zoe_api.master_api.APIManager
    :type sql: zoe_lib.sql_manager.SQLManager
    """
    def __init__(self):
        self.master = zoe_api.master_api.APIManager()
42
        self.sql = zoe_lib.state.SQLManager(get_conf())
43

44
    def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
45 46 47 48
        """Lookup an execution by its ID."""
        e = self.sql.execution_list(id=execution_id, only_one=True)
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
49
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
50 51 52 53 54 55 56 57 58 59
        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return e

    def execution_list(self, uid, role, **filters):
        """Generate a optionally filtered list of executions."""
        execs = self.sql.execution_list(**filters)
        ret = [e for e in execs if e.user_id == uid or role == 'admin']
        return ret

60 61 62 63 64 65 66
    def zapp_validate(self, application_description):
        """Validates the passed ZApp description against the supported schema."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

67
    def execution_start(self, uid, role, exec_name, application_description): # pylint: disable=unused-argument
68 69 70 71 72 73
        """Start an execution."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

74 75 76 77 78 79 80 81
        # quota check
        if role == "guest":
            running_execs = self.execution_list(uid, role, **{'status': 'running'})
            running_execs += self.execution_list(uid, role, **{'status': 'starting'})
            running_execs += self.execution_list(uid, role, **{'status': 'scheduled'})
            running_execs += self.execution_list(uid, role, **{'status': 'submitted'})
            if len(running_execs) > GUEST_QUOTA_MAX_EXECUTIONS:
                raise zoe_api.exceptions.ZoeException('Guest users cannot run more than one execution at a time, quota exceeded.')
82 83 84 85 86

        new_id = self.sql.execution_new(exec_name, uid, application_description)
        success, message = self.master.execution_start(new_id)
        if not success:
            raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
hxquangnhat's avatar
hxquangnhat committed
87

88 89 90 91 92
        return new_id

    def execution_terminate(self, uid, role, exec_id):
        """Terminate an execution."""
        e = self.sql.execution_list(id=exec_id, only_one=True)
93
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
94 95 96 97 98 99
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

100
        if e.is_active:
101 102 103 104 105 106
            return self.master.execution_terminate(exec_id)
        else:
            raise zoe_api.exceptions.ZoeException('Execution is not running')

    def execution_delete(self, uid, role, exec_id):
        """Delete an execution."""
107 108 109
        if role != "admin":
            raise zoe_api.exceptions.ZoeAuthException()

110
        e = self.sql.execution_list(id=exec_id, only_one=True)
111
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
112 113 114 115 116 117
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

118
        if e.is_active:
119
            raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
120 121 122 123 124 125 126 127

        status, message = self.master.execution_delete(exec_id)
        if status:
            self.sql.execution_delete(exec_id)
            return True, ''
        else:
            raise zoe_api.exceptions.ZoeException(message)

128
    def service_by_id(self, uid, role, service_id) -> zoe_lib.state.sql_manager.Service:
129 130 131 132 133 134 135 136 137 138 139 140 141 142
        """Lookup a service by its ID."""
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return service

    def service_list(self, uid, role, **filters):
        """Generate a optionally filtered list of services."""
        services = self.sql.service_list(**filters)
        ret = [s for s in services if s.user_id == uid or role == 'admin']
        return ret

143
    def service_logs(self, uid, role, service_id):
144 145 146
        """Retrieve the logs for the given service.
        If stream is True, a file object is returned, otherwise the log contents as a str object.
        """
147 148 149 150 151
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such service')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
152 153 154 155

        path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
        if not os.path.exists(path):
            raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
156
        return open(path, encoding='utf-8')
157

158 159 160 161 162 163 164 165
    def statistics_scheduler(self, uid_, role_):
        """Retrieve statistics about the scheduler."""
        success, message = self.master.scheduler_statistics()
        if success:
            return message

    def retry_submit_error_executions(self):
        """Resubmit any execution forgotten by the master."""
166
        waiting_execs = self.sql.execution_list(status=zoe_lib.state.sql_manager.Execution.SUBMIT_STATUS)
167 168 169 170 171 172 173 174 175 176
        if waiting_execs is None or len(waiting_execs) == 0:
            return
        e = waiting_execs[0]
        success, message = self.master.execution_start(e.id)
        if not success:
            log.warning('Zoe Master unavailable ({}), execution {} still waiting'.format(message, e.id))

    def cleanup_dead_executions(self):
        """Terminates all executions with dead "monitor" services."""
        log.debug('Starting dead execution cleanup task')
177
        all_execs = self.sql.execution_list(status='running')
178
        for execution in all_execs:
179 180 181 182 183
            for service in execution.services:
                if service.description['monitor'] and service.status == service.BACKEND_DIE_STATUS:
                    log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
                    self.master.execution_terminate(execution.id)
                    break
184

185
        log.debug('Cleanup task finished')
186

187
    def execution_endpoints(self, uid: str, role: str, execution: zoe_lib.state.Execution):
188 189 190 191 192 193
        """Return a list of the services and public endpoints available for a certain execution."""
        services_info = []
        endpoints = []
        for service in execution.services:
            services_info.append(self.service_by_id(uid, role, service.id))
            for port in service.description['ports']:
194 195
                port_key = str(port['port_number']) + "/" + port['protocol']
                backend_port = self.sql.port_list(only_one=True, service_id=service.id, internal_name=port_key)
196
                if backend_port is not None and backend_port.external_ip is not None:
197
                    endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
198
                    endpoints.append((port['name'], endpoint))
199 200

        return services_info, endpoints