api_endpoint.py 9.91 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The real API, exposed as web pages or REST API."""

import logging
19
import os
20

21
22
import zoe_api.exceptions
import zoe_api.master_api
23
24
import zoe_lib.applications
import zoe_lib.exceptions
25
26
import zoe_lib.state
from zoe_lib.config import get_conf
27
28
29
30
31
32
33
34
35
36
37

log = logging.getLogger(__name__)


class APIEndpoint:
    """
    The APIEndpoint class.

    :type master: zoe_api.master_api.APIManager
    :type sql: zoe_lib.sql_manager.SQLManager
    """
38
    def __init__(self, master_api, sql_manager: zoe_lib.state.SQLManager):
39
40
        self.master = master_api
        self.sql = sql_manager
41

42
    def execution_by_id(self, user: zoe_lib.state.User, execution_id: int) -> zoe_lib.state.Execution:
43
        """Lookup an execution by its ID."""
44
        e = self.sql.executions.select(id=execution_id, only_one=True)
45
46
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
47
        assert isinstance(e, zoe_lib.state.Execution)
48
        if e.user_id != user.id and not user.role.can_operate_others:
49
50
51
            raise zoe_api.exceptions.ZoeAuthException()
        return e

52
    def execution_list(self, user: zoe_lib.state.User, **filters):
53
        """Generate a optionally filtered list of executions."""
54
55
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
56
        execs = self.sql.executions.select(**filters)
57
58
        return execs

59
    def execution_count(self, user: zoe_lib.state.User, **filters):
60
        """Count the number of executions optionally filtered."""
61
62
63
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
        return self.sql.executions.count(**filters)
64

65
66
67
68
69
70
71
    def zapp_validate(self, application_description):
        """Validates the passed ZApp description against the supported schema."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

72
73
74
75
76
77
78
79
80
81
82
83
84
85
    def _check_quota(self, user: zoe_lib.state.User, application_description):  # pylint: disable=unused-argument
        """Check quota for given user and execution."""
        quota = self.sql.quota.select(only_one=True, **{'id': user.quota_id})

        running_execs = self.sql.executions.select(**{'status': 'running', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'starting', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'scheduled', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'image download', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'submitted', 'user_id': user.id})
        if len(running_execs) >= quota.concurrent_executions:
            raise zoe_api.exceptions.ZoeException('You cannot run more than {} executions at a time, quota exceeded.'.format(quota.concurrent_executions))

        # TODO: implement core and memory quotas

86
    def execution_start(self, user: zoe_lib.state.User, exec_name, application_description):
87
88
89
90
91
92
        """Start an execution."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

93
        self._check_quota(user, application_description)
94

95
        new_id = self.sql.executions.insert(exec_name, user.id, application_description)
96
97
98
        success, message = self.master.execution_start(new_id)
        if not success:
            raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
hxquangnhat's avatar
hxquangnhat committed
99

100
101
        return new_id

102
    def execution_terminate(self, user: zoe_lib.state.User, exec_id: int):
103
        """Terminate an execution."""
104
105
        e = self.sql.executions.select(id=exec_id, only_one=True)
        assert isinstance(e, zoe_lib.state.Execution)
106
107
108
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

109
110
        if e.user_id != user.id and not user.role.can_operate_others:
            raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
111

112
        if e.is_active:
113
114
115
116
            return self.master.execution_terminate(exec_id)
        else:
            raise zoe_api.exceptions.ZoeException('Execution is not running')

117
    def execution_delete(self, user: zoe_lib.state.User, exec_id: int):
118
        """Delete an execution."""
119
        if not user.role.can_delete_executions:
120
121
            raise zoe_api.exceptions.ZoeAuthException()

122
123
        e = self.sql.executions.select(id=exec_id, only_one=True)
        assert isinstance(e, zoe_lib.state.Execution)
124
125
126
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

127
128
        if e.user_id != user.id and not user.role.can_operate_others:
            raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
129

130
        if e.is_active:
131
            raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
132
133
134

        status, message = self.master.execution_delete(exec_id)
        if status:
135
            self.sql.executions.delete(exec_id)
136
137
138
139
            return True, ''
        else:
            raise zoe_api.exceptions.ZoeException(message)

140
    def service_by_id(self, user: zoe_lib.state.User, service_id: int) -> zoe_lib.state.Service:
141
        """Lookup a service by its ID."""
142
        service = self.sql.services.select(id=service_id, only_one=True)
143
144
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
145
        if service.user_id != user.id and not user.role.can_operate_others:
146
147
148
            raise zoe_api.exceptions.ZoeAuthException()
        return service

149
    def service_list(self, user: zoe_lib.state.User, **filters):
150
        """Generate a optionally filtered list of services."""
151
152
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
153
        return self.sql.services.select(**filters)
154

155
    def service_logs(self, user: zoe_lib.state.User, service_id):
156
157
158
        """Retrieve the logs for the given service.
        If stream is True, a file object is returned, otherwise the log contents as a str object.
        """
159
        service = self.sql.services.select(id=service_id, only_one=True)
160
161
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such service')
162
        if service.user_id != user.id and not user.role.can_operate_others:
163
            raise zoe_api.exceptions.ZoeAuthException()
164
165
166
167

        path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
        if not os.path.exists(path):
            raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
168
        return open(path, encoding='utf-8')
169

170
    def statistics_scheduler(self):
171
        """Retrieve statistics about the scheduler."""
172
173
        success, message = self.master.scheduler_statistics()
        if success:
174
175
176
177
            for node in message['platform_stats']['nodes']:  # JSON does not like hash keys to be integers, so we need to convert manually
                for str_service_id in list(node['service_stats'].keys()):
                    node['service_stats'][int(str_service_id)] = node['service_stats'][str_service_id]
                    del node['service_stats'][str_service_id]
178
            return message
Daniele Venzano's avatar
Daniele Venzano committed
179
180
        else:
            raise zoe_api.exceptions.ZoeException(message=message)
181

182
    def execution_endpoints(self, user: zoe_lib.state.User, execution: zoe_lib.state.Execution):
183
184
185
186
        """Return a list of the services and public endpoints available for a certain execution."""
        services_info = []
        endpoints = []
        for service in execution.services:
187
            services_info.append(self.service_by_id(user, service.id))
188
            for port in service.description['ports']:
189
                port_key = str(port['port_number']) + "/" + port['protocol']
190
                backend_port = self.sql.ports.select(only_one=True, service_id=service.id, internal_name=port_key)
191
                if backend_port is not None and backend_port.external_ip is not None:
192
                    endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
193
                    endpoints.append((port['name'], endpoint))
194
195

        return services_info, endpoints
196

197
    def user_by_name(self, username) -> zoe_lib.state.User:
198
199
        """Finds a user in the database looking it up by its username."""
        return self.sql.user.select(only_one=True, **{'username': username})
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215

    def user_by_id(self, user: zoe_lib.state.User, user_id: int) -> zoe_lib.state.User:
        """Finds a user in the database looking it up by its username."""
        if user.id == user_id:
            return user
        if not user.role.can_operate_others:
            raise zoe_api.exceptions.ZoeAuthException()

        return self.sql.user.select(only_one=True, id=user_id)

    def user_delete(self, user: zoe_lib.state.User, user_id: int):
        """Deletes the user identified by the ID."""
        if not user.role.can_change_config:
            raise zoe_api.exceptions.ZoeAuthException()

        self.sql.user.delete(user_id)