websockets.py 4.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Ajax API for the Zoe web interface."""

import datetime
import json
20
import logging
21
from concurrent.futures import ThreadPoolExecutor
22 23 24

import tornado.websocket
import tornado.iostream
25
import tornado.gen
26 27 28 29 30

from zoe_lib.config import get_conf
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint  # pylint: disable=unused-import
from zoe_api.web.utils import get_auth, catch_exceptions
31 32

log = logging.getLogger(__name__)
33

34 35
THREAD_POOL = ThreadPoolExecutor(20)

36

37
class WebSocketEndpointWeb(tornado.websocket.WebSocketHandler):
38 39 40
    """Handler class"""
    def initialize(self, **kwargs):
        """Initializes the request handler."""
41
        super().initialize()
42
        self.api_endpoint = kwargs['api_endpoint']  # type: APIEndpoint
43 44
        self.uid = None
        self.role = None
45
        self.connection_closed = None
46 47

    @catch_exceptions
48 49 50
    def open(self, *args, **kwargs):
        """Invoked when a new WebSocket is opened."""
        log.debug('WebSocket opened')
51
        uid, role = get_auth(self)
52 53 54 55 56
        if uid is None:
            self.close(401, "Unauthorized")
        else:
            self.uid = uid
            self.role = role
57

58
    @catch_exceptions
59
    @tornado.gen.coroutine
60 61
    def on_message(self, message):
        """WebSocket message handler."""
62

63 64 65 66 67 68
        if message is None:
            return

        request = json.loads(message)

        if request['command'] == 'start_zapp':
69
            app_descr = json.load(open('contrib/zoeapps/eurecom_aml_lab.json', 'r'))
70
            execution = self.api_endpoint.execution_list(self.uid, self.role, name='aml-lab')
71
            if len(execution) == 0:
72
                exec_id = self.api_endpoint.execution_start(self.uid, self.role, 'aml-lab', app_descr)
73 74 75 76 77 78 79
            else:
                execution = execution[0]
                exec_id = execution.id
            response = {
                'status': 'ok',
                'execution_id': exec_id
            }
80 81
            self.write_message(response)
        elif request['command'] == 'query_status':
82
            try:
83
                execution = self.api_endpoint.execution_by_id(self.uid, self.role, request['exec_id'])
84 85 86 87 88 89 90 91 92 93 94 95
            except zoe_api.exceptions.ZoeNotFoundException:
                response = {
                    'status': 'ok',
                    'exec_status': 'none'
                }
            else:
                response = {
                    'status': 'ok',
                    'exec_status': execution.status
                }
                if execution.status == execution.RUNNING_STATUS:
                    response['ttl'] = ((execution.time_start + datetime.timedelta(hours=get_conf().aml_ttl)) - datetime.datetime.now()).total_seconds()
96
                    services_info_, endpoints = self.api_endpoint.execution_endpoints(self.uid, self.role, execution)
97 98
                    response['endpoints'] = endpoints
                elif execution.status == execution.ERROR_STATUS or execution.status == execution.TERMINATED_STATUS:
99
                    self.api_endpoint.execution_delete(self.uid, self.role, execution.id)
100
            self.write_message(response)
101
        elif request['command'] == 'service_logs':
102 103 104 105 106 107 108 109 110 111
            log_obj = self.api_endpoint.service_logs(self.uid, self.role, request['service_id'])

            while not self.connection_closed:
                try:
                    log_line = yield THREAD_POOL.submit(next, log_obj)
                except StopIteration:
                    yield tornado.gen.sleep(0.2)
                    continue

                self.write_message(log_line)
112 113 114
        elif request['command'] == 'system_status':
            stats = self.api_endpoint.statistics_scheduler(self.uid, self.role)
            self.write_message(json.dumps(stats))
115 116 117 118 119
        else:
            response = {
                'status': 'error',
                'message': 'unknown request type'
            }
120 121 122 123 124
            self.write_message(response)

    def _stream_log_line(self, log_line):
        self.write_message(log_line)
        self.stream.read_until(b'\n', callback=self._stream_log_line)
125

126 127 128
    def on_close(self):
        """Invoked when the WebSocket is closed."""
        log.debug("WebSocket closed")
129
        self.connection_closed = True
130 131 132 133

    def data_received(self, chunk):
        """Not implemented as we do not use stream uploads"""
        pass