api_endpoint.py 9.12 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The real API, exposed as web pages or REST API."""

18
from datetime import datetime, timedelta
19
20
import logging
import re
21
import os
22

23
24
import zoe_api.exceptions
import zoe_api.master_api
25
26
import zoe_lib.applications
import zoe_lib.exceptions
27
28
import zoe_lib.state
from zoe_lib.config import get_conf
29
30
31
32
33
34
35
36
37
38
39
40
41

log = logging.getLogger(__name__)


class APIEndpoint:
    """
    The APIEndpoint class.

    :type master: zoe_api.master_api.APIManager
    :type sql: zoe_lib.sql_manager.SQLManager
    """
    def __init__(self):
        self.master = zoe_api.master_api.APIManager()
42
        self.sql = zoe_lib.state.SQLManager(get_conf())
43

44
    def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
45
46
47
48
        """Lookup an execution by its ID."""
        e = self.sql.execution_list(id=execution_id, only_one=True)
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
49
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
50
51
52
53
54
55
56
57
58
59
        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return e

    def execution_list(self, uid, role, **filters):
        """Generate a optionally filtered list of executions."""
        execs = self.sql.execution_list(**filters)
        ret = [e for e in execs if e.user_id == uid or role == 'admin']
        return ret

60
61
62
63
64
65
66
    def zapp_validate(self, application_description):
        """Validates the passed ZApp description against the supported schema."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

qhoangxuan's avatar
qhoangxuan committed
67
    def execution_start(self, uid, role, exec_name, application_description): # pylint: disable=unused-argument
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
        """Start an execution."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

        if 3 > len(exec_name) > 128:
            raise zoe_api.exceptions.ZoeException("Execution name must be between 4 and 128 characters long")
        if not re.match(r'^[a-zA-Z0-9\-]+$', exec_name):
            raise zoe_api.exceptions.ZoeException("Execution name can contain only letters, numbers and dashes. '{}' is not valid.".format(exec_name))

        new_id = self.sql.execution_new(exec_name, uid, application_description)
        success, message = self.master.execution_start(new_id)
        if not success:
            raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
hxquangnhat's avatar
hxquangnhat committed
83

84
85
86
87
88
        return new_id

    def execution_terminate(self, uid, role, exec_id):
        """Terminate an execution."""
        e = self.sql.execution_list(id=exec_id, only_one=True)
89
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
90
91
92
93
94
95
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

96
        if e.is_active:
97
98
99
100
101
102
            return self.master.execution_terminate(exec_id)
        else:
            raise zoe_api.exceptions.ZoeException('Execution is not running')

    def execution_delete(self, uid, role, exec_id):
        """Delete an execution."""
103
104
105
        if role != "admin":
            raise zoe_api.exceptions.ZoeAuthException()

106
        e = self.sql.execution_list(id=exec_id, only_one=True)
107
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
108
109
110
111
112
113
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

114
        if e.is_active:
115
            raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
116
117
118
119
120
121
122
123

        status, message = self.master.execution_delete(exec_id)
        if status:
            self.sql.execution_delete(exec_id)
            return True, ''
        else:
            raise zoe_api.exceptions.ZoeException(message)

124
    def service_by_id(self, uid, role, service_id) -> zoe_lib.state.sql_manager.Service:
125
126
127
128
129
130
131
132
133
134
135
136
137
138
        """Lookup a service by its ID."""
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return service

    def service_list(self, uid, role, **filters):
        """Generate a optionally filtered list of services."""
        services = self.sql.service_list(**filters)
        ret = [s for s in services if s.user_id == uid or role == 'admin']
        return ret

139
    def service_logs(self, uid, role, service_id):
140
141
142
        """Retrieve the logs for the given service.
        If stream is True, a file object is returned, otherwise the log contents as a str object.
        """
143
144
145
146
147
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such service')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
148
149
150
151

        path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
        if not os.path.exists(path):
            raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
152
        return open(path, encoding='utf-8')
153

154
155
156
157
158
159
160
161
    def statistics_scheduler(self, uid_, role_):
        """Retrieve statistics about the scheduler."""
        success, message = self.master.scheduler_statistics()
        if success:
            return message

    def retry_submit_error_executions(self):
        """Resubmit any execution forgotten by the master."""
162
        waiting_execs = self.sql.execution_list(status=zoe_lib.state.sql_manager.Execution.SUBMIT_STATUS)
163
164
165
166
167
168
169
170
171
172
173
174
        if waiting_execs is None or len(waiting_execs) == 0:
            return
        e = waiting_execs[0]
        success, message = self.master.execution_start(e.id)
        if not success:
            log.warning('Zoe Master unavailable ({}), execution {} still waiting'.format(message, e.id))

    def cleanup_dead_executions(self):
        """Terminates all executions with dead "monitor" services."""
        log.debug('Starting dead execution cleanup task')
        all_execs = self.sql.execution_list()
        for execution in all_execs:
175
            if execution.is_running:
176
                terminated = False
177
                for service in execution.services:
178
179
                    if service.description['monitor'] and service.is_dead():
                        log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
180
                        self.master.execution_terminate(execution.id)
181
                        terminated = True
182
                        break
183
184
185
186
187
188
                if not terminated and execution.name == "aml-lab":
                    log.debug('Looking at AML execution {}...'.format(execution.id))
                    if datetime.now() - execution.time_start > timedelta(hours=get_conf().aml_ttl):
                        log.info('Terminating AML-LAB execution for user {}, timer expired'.format(execution.user_id))
                        self.master.execution_terminate(execution.id)

189
        log.debug('Cleanup task finished')
190

191
    def execution_endpoints(self, uid: str, role: str, execution: zoe_lib.state.Execution):
192
193
194
195
196
197
        """Return a list of the services and public endpoints available for a certain execution."""
        services_info = []
        endpoints = []
        for service in execution.services:
            services_info.append(self.service_by_id(uid, role, service.id))
            for port in service.description['ports']:
198
199
                port_key = str(port['port_number']) + "/" + port['protocol']
                backend_port = self.sql.port_list(only_one=True, service_id=service.id, internal_name=port_key)
200
                if backend_port is not None and backend_port.external_ip is not None:
201
                    endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
202
                    endpoints.append((port['name'], endpoint))
203
204

        return services_info, endpoints