api_endpoint.py 9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The real API, exposed as web pages or REST API."""

18
from datetime import datetime, timedelta
19
20
import logging
import re
21
import os
22

23
24
import zoe_api.exceptions
import zoe_api.master_api
25
26
import zoe_lib.applications
import zoe_lib.exceptions
27
28
import zoe_lib.state
from zoe_lib.config import get_conf
29
30
31
32
33
34
35
36
37
38
39
40
41

log = logging.getLogger(__name__)


class APIEndpoint:
    """
    The APIEndpoint class.

    :type master: zoe_api.master_api.APIManager
    :type sql: zoe_lib.sql_manager.SQLManager
    """
    def __init__(self):
        self.master = zoe_api.master_api.APIManager()
42
        self.sql = zoe_lib.state.SQLManager(get_conf())
43

44
    def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
45
46
47
48
        """Lookup an execution by its ID."""
        e = self.sql.execution_list(id=execution_id, only_one=True)
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
49
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
50
51
52
53
54
55
56
57
58
59
        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return e

    def execution_list(self, uid, role, **filters):
        """Generate a optionally filtered list of executions."""
        execs = self.sql.execution_list(**filters)
        ret = [e for e in execs if e.user_id == uid or role == 'admin']
        return ret

60
61
62
63
64
65
66
    def zapp_validate(self, application_description):
        """Validates the passed ZApp description against the supported schema."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

qhoangxuan's avatar
qhoangxuan committed
67
    def execution_start(self, uid, role, exec_name, application_description): # pylint: disable=unused-argument
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
        """Start an execution."""
        try:
            zoe_lib.applications.app_validate(application_description)
        except zoe_lib.exceptions.InvalidApplicationDescription as e:
            raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)

        if 3 > len(exec_name) > 128:
            raise zoe_api.exceptions.ZoeException("Execution name must be between 4 and 128 characters long")
        if not re.match(r'^[a-zA-Z0-9\-]+$', exec_name):
            raise zoe_api.exceptions.ZoeException("Execution name can contain only letters, numbers and dashes. '{}' is not valid.".format(exec_name))

        new_id = self.sql.execution_new(exec_name, uid, application_description)
        success, message = self.master.execution_start(new_id)
        if not success:
            raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
hxquangnhat's avatar
hxquangnhat committed
83

84
85
86
87
88
        return new_id

    def execution_terminate(self, uid, role, exec_id):
        """Terminate an execution."""
        e = self.sql.execution_list(id=exec_id, only_one=True)
89
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
90
91
92
93
94
95
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

96
        if e.is_active:
97
98
99
100
101
102
103
            return self.master.execution_terminate(exec_id)
        else:
            raise zoe_api.exceptions.ZoeException('Execution is not running')

    def execution_delete(self, uid, role, exec_id):
        """Delete an execution."""
        e = self.sql.execution_list(id=exec_id, only_one=True)
104
        assert isinstance(e, zoe_lib.state.sql_manager.Execution)
105
106
107
108
109
110
        if e is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')

        if e.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()

111
        if e.is_active:
112
            raise zoe_api.exceptions.ZoeException('Cannot delete an active execution')
113
114
115
116
117
118
119
120

        status, message = self.master.execution_delete(exec_id)
        if status:
            self.sql.execution_delete(exec_id)
            return True, ''
        else:
            raise zoe_api.exceptions.ZoeException(message)

121
    def service_by_id(self, uid, role, service_id) -> zoe_lib.state.sql_manager.Service:
122
123
124
125
126
127
128
129
130
131
132
133
134
135
        """Lookup a service by its ID."""
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
        return service

    def service_list(self, uid, role, **filters):
        """Generate a optionally filtered list of services."""
        services = self.sql.service_list(**filters)
        ret = [s for s in services if s.user_id == uid or role == 'admin']
        return ret

136
    def service_logs(self, uid, role, service_id):
137
138
139
        """Retrieve the logs for the given service.
        If stream is True, a file object is returned, otherwise the log contents as a str object.
        """
140
141
142
143
144
        service = self.sql.service_list(id=service_id, only_one=True)
        if service is None:
            raise zoe_api.exceptions.ZoeNotFoundException('No such service')
        if service.user_id != uid and role != 'admin':
            raise zoe_api.exceptions.ZoeAuthException()
145
146
147
148

        path = os.path.join(get_conf().service_logs_base_path, get_conf().deployment_name, str(service.execution_id), service.name + '.txt')
        if not os.path.exists(path):
            raise zoe_api.exceptions.ZoeNotFoundException('Service log not available')
149
        return open(path, encoding='utf-8')
150

151
152
153
154
155
156
157
158
    def statistics_scheduler(self, uid_, role_):
        """Retrieve statistics about the scheduler."""
        success, message = self.master.scheduler_statistics()
        if success:
            return message

    def retry_submit_error_executions(self):
        """Resubmit any execution forgotten by the master."""
159
        waiting_execs = self.sql.execution_list(status=zoe_lib.state.sql_manager.Execution.SUBMIT_STATUS)
160
161
162
163
164
165
166
167
168
169
170
171
        if waiting_execs is None or len(waiting_execs) == 0:
            return
        e = waiting_execs[0]
        success, message = self.master.execution_start(e.id)
        if not success:
            log.warning('Zoe Master unavailable ({}), execution {} still waiting'.format(message, e.id))

    def cleanup_dead_executions(self):
        """Terminates all executions with dead "monitor" services."""
        log.debug('Starting dead execution cleanup task')
        all_execs = self.sql.execution_list()
        for execution in all_execs:
172
            if execution.is_running:
173
                terminated = False
174
                for service in execution.services:
175
176
                    if service.description['monitor'] and service.is_dead():
                        log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
177
                        self.master.execution_terminate(execution.id)
178
                        terminated = True
179
                        break
180
181
182
183
184
185
                if not terminated and execution.name == "aml-lab":
                    log.debug('Looking at AML execution {}...'.format(execution.id))
                    if datetime.now() - execution.time_start > timedelta(hours=get_conf().aml_ttl):
                        log.info('Terminating AML-LAB execution for user {}, timer expired'.format(execution.user_id))
                        self.master.execution_terminate(execution.id)

186
        log.debug('Cleanup task finished')
187

188
    def execution_endpoints(self, uid: str, role: str, execution: zoe_lib.state.Execution):
189
190
191
192
193
194
        """Return a list of the services and public endpoints available for a certain execution."""
        services_info = []
        endpoints = []
        for service in execution.services:
            services_info.append(self.service_by_id(uid, role, service.id))
            for port in service.description['ports']:
195
196
197
198
                port_key = str(port['port_number']) + "/" + port['protocol']
                backend_port = self.sql.port_list(only_one=True, service_id=service.id, internal_name=port_key)
                if backend_port.external_ip is not None:
                    endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
199
                    endpoints.append((port['name'], endpoint))
200
201

        return services_info, endpoints