api_endpoint.py 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The real API, exposed as web pages or REST API."""

import logging
19
import os
20
from typing import List
21

22
23
import zoe_api.exceptions
import zoe_api.master_api
24
25
import zoe_lib.applications
import zoe_lib.exceptions
26
27
import zoe_lib.state
from zoe_lib.config import get_conf
28
29
30
31
32
33
34
35
36
37
38

log = logging.getLogger(__name__)


class APIEndpoint:
    """
    The APIEndpoint class.

    :type master: zoe_api.master_api.APIManager
    :type sql: zoe_lib.sql_manager.SQLManager
    """
39
    def __init__(self, master_api, sql_manager: zoe_lib.state.SQLManager):
40
41
        self.master = master_api
        self.sql = sql_manager
42

43
    def execution_by_id(self, user: zoe_lib.state.User, execution_id: int) -> zoe_lib.state.Execution:
44
        """Lookup an execution by its ID."""
45
        e = self.sql.executions.select(id=execution_id, only_one=True)
46
47
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
48
        assert isinstance(e, zoe_lib.state.Execution)
49
        if e.user_id != user.id and not user.role.can_operate_others:
50
51
52
            raise zoe_api.exceptions.ZoeAuthException()
        return e

53
    def execution_list(self, user: zoe_lib.state.User, **filters):
54
        """Generate a optionally filtered list of executions."""
55
56
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
57
        execs = self.sql.executions.select(**filters)
58
59
        return execs

60
    def execution_count(self, user: zoe_lib.state.User, **filters):
61
        """Count the number of executions optionally filtered."""
62
63
64
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
        return self.sql.executions.count(**filters)
65

66
67
68
69
70
71
72
    def zapp_validate(self, application_description):
        """Validates the passed ZApp description against the supported schema."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

73
74
75
76
77
78
79
80
81
82
83
84
85
86
    def _check_quota(self, user: zoe_lib.state.User, application_description):  # pylint: disable=unused-argument
        """Check quota for given user and execution."""
        quota = self.sql.quota.select(only_one=True, **{'id': user.quota_id})

        running_execs = self.sql.executions.select(**{'status': 'running', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'starting', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'scheduled', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'image download', 'user_id': user.id})
        running_execs += self.sql.executions.select(**{'status': 'submitted', 'user_id': user.id})
        if len(running_execs) >= quota.concurrent_executions:
            raise zoe_api.exceptions.ZoeException('You cannot run more than {} executions at a time, quota exceeded.'.format(quota.concurrent_executions))

        # TODO: implement core and memory quotas

87
    def execution_start(self, user: zoe_lib.state.User, exec_name, application_description):
88
89
90
91
92
93
        """Start an execution."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

94
        self._check_quota(user, application_description)
95

96
        new_id = self.sql.executions.insert(exec_name, user.id, application_description)
97
98
99
        success, message = self.master.execution_start(new_id)
        if not success:
            raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
hxquangnhat's avatar
hxquangnhat committed
100

101
102
        return new_id

103
    def execution_terminate(self, user: zoe_lib.state.User, exec_id: int):
104
        """Terminate an execution."""
105
106
        e = self.sql.executions.select(id=exec_id, only_one=True)
        assert isinstance(e, zoe_lib.state.Execution)
107
108
109
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

110
111
        if e.user_id != user.id and not user.role.can_operate_others:
            raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
112

113
        if e.is_active:
114
115
116
117
            return self.master.execution_terminate(exec_id)
        else:
            raise zoe_api.exceptions.ZoeException('Execution is not running')

118
    def execution_delete(self, user: zoe_lib.state.User, exec_id: int):
119
        """Delete an execution."""
120
        if not user.role.can_delete_executions:
121
122
            raise zoe_api.exceptions.ZoeAuthException()

123
124
        e = self.sql.executions.select(id=exec_id, only_one=True)
        assert isinstance(e, zoe_lib.state.Execution)
125
126
127
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

128
129
        if e.user_id != user.id and not user.role.can_operate_others:
            raise zoe_api.exceptions.ZoeException('You are not authorized to terminate this execution')
130

131
        if e.is_active:
132
            raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
133
134
135

        status, message = self.master.execution_delete(exec_id)
        if status:
136
            self.sql.executions.delete(exec_id)
137
138
139
140
            return True, ''
        else:
            raise zoe_api.exceptions.ZoeException(message)

141
    def service_by_id(self, user: zoe_lib.state.User, service_id: int) -> zoe_lib.state.Service:
142
        """Lookup a service by its ID."""
143
        service = self.sql.services.select(id=service_id, only_one=True)
144
145
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
146
        if service.user_id != user.id and not user.role.can_operate_others:
147
148
149
            raise zoe_api.exceptions.ZoeAuthException()
        return service

150
    def service_list(self, user: zoe_lib.state.User, **filters):
151
        """Generate a optionally filtered list of services."""
152
153
        if not user.role.can_operate_others:
            filters['user_id'] = user.id
154
        return self.sql.services.select(**filters)
155

156
    def service_logs(self, user: zoe_lib.state.User, service_id):
157
158
159
        """Retrieve the logs for the given service.
        If stream is True, a file object is returned, otherwise the log contents as a str object.
        """
160
        service = self.sql.services.select(id=service_id, only_one=True)
161
162
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such service')
163
        if service.user_id != user.id and not user.role.can_operate_others:
164
            raise zoe_api.exceptions.ZoeAuthException()
165
166
167
168

        path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
        if not os.path.exists(path):
            raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
169
        return open(path, encoding='utf-8')
170

171
    def statistics_scheduler(self):
172
        """Retrieve statistics about the scheduler."""
173
174
        success, message = self.master.scheduler_statistics()
        if success:
175
176
177
178
            for node in message['platform_stats']['nodes']:  # JSON does not like hash keys to be integers, so we need to convert manually
                for str_service_id in list(node['service_stats'].keys()):
                    node['service_stats'][int(str_service_id)] = node['service_stats'][str_service_id]
                    del node['service_stats'][str_service_id]
179
            return message
Daniele Venzano's avatar
Daniele Venzano committed
180
181
        else:
            raise zoe_api.exceptions.ZoeException(message=message)
182

183
    def execution_endpoints(self, user: zoe_lib.state.User, execution: zoe_lib.state.Execution):
184
185
186
187
        """Return a list of the services and public endpoints available for a certain execution."""
        services_info = []
        endpoints = []
        for service in execution.services:
188
            services_info.append(self.service_by_id(user, service.id))
189
            for port in service.description['ports']:
190
                port_key = str(port['port_number']) + "/" + port['protocol']
191
                backend_port = self.sql.ports.select(only_one=True, service_id=service.id, internal_name=port_key)
192
                if backend_port is not None and backend_port.external_ip is not None:
193
                    endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
194
                    endpoints.append((port['name'], endpoint))
195
196

        return services_info, endpoints
197

198
    def user_by_name(self, username) -> zoe_lib.state.User:
199
200
        """Finds a user in the database looking it up by its username."""
        return self.sql.user.select(only_one=True, **{'username': username})
201
202
203
204
205

    def user_by_id(self, user: zoe_lib.state.User, user_id: int) -> zoe_lib.state.User:
        """Finds a user in the database looking it up by its username."""
        if user.id == user_id:
            return user
206
        if not user.role.can_change_config:
207
208
209
210
211
212
213
214
215
216
            raise zoe_api.exceptions.ZoeAuthException()

        return self.sql.user.select(only_one=True, id=user_id)

    def user_delete(self, user: zoe_lib.state.User, user_id: int):
        """Deletes the user identified by the ID."""
        if not user.role.can_change_config:
            raise zoe_api.exceptions.ZoeAuthException()

        self.sql.user.delete(user_id)
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272

    def user_list(self, user: zoe_lib.state.User, **filters) -> List[zoe_lib.state.User]:
        """Generate a optionally filtered list of executions."""
        if not user.role.can_change_config:
            raise zoe_api.exceptions.ZoeAuthException()
        users = self.sql.user.select(**filters)
        return users

    def user_new(self, user: zoe_lib.state.User, username: str, fs_uid: int, role: str, quota: str, auth_source: str) -> int:
        """Creates a new user."""
        if not user.role.can_change_config:
            raise zoe_api.exceptions.ZoeAuthException()

        return self.sql.user.insert(username, fs_uid, role, quota, auth_source)

    def user_update(self, user: zoe_lib.state.User, **user_data):
        """Update a user."""

        if 'id' not in user_data:
            raise KeyError
        self.user_by_id(user, user_data['id'])

        update_fields = {}

        if not user.role.can_change_config:
            if 'email' in user_data:
                update_fields['email'] = user_data['email']
        else:
            if 'email' in user_data:
                update_fields['email'] = user_data['email']
            if 'priority' in user_data:
                update_fields['priority'] = user_data['priority']
            if 'enabled' in user_data:
                update_fields['enabled'] = user_data['enabled']
            if 'auth_source' in user_data:
                update_fields['auth_source'] = user_data['auth_source']
            if 'quota' in user_data:
                quota = self.quota_by_name(user_data['quota'])
                if quota is None:
                    raise zoe_api.exceptions.ZoeRestAPIException('No quota called {}'.format(user_data['quota']))
                update_fields['quota_id'] = quota.id
            if 'role' in user_data:
                role = self.role_by_name(user_data['role'])
                if role is None:
                    raise zoe_api.exceptions.ZoeRestAPIException('No role called {}'.format(user_data['role']))
                update_fields['role_id'] = role.id

        self.sql.user.update(user_data['id'], **update_fields)

    def quota_by_name(self, quota) -> zoe_lib.state.Quota:
        """Finds a quota in the database looking it up by its name."""
        return self.sql.quota.select(only_one=True, **{'name': quota})

    def role_by_name(self, role) -> zoe_lib.state.Role:
        """Finds a role in the database looking it up by its name."""
        return self.sql.role.select(only_one=True, **{'name': role})