Commit 0d98f7d7 authored by Daniele Venzano's avatar Daniele Venzano

Port changes from the 0.10.3-aml branch

parent 296244c0
......@@ -78,7 +78,7 @@ images:
- docker build --pull -t ${DOCKER_REGISTRY}/ci/${ZOE_TEST_IMAGE} .
- docker push ${DOCKER_REGISTRY}/ci/${ZOE_TEST_IMAGE}
only:
- /^.*master$/
- master
docs:
stage: deploy
......@@ -96,7 +96,7 @@ docs:
environment:
name: staging
only:
- /^.*master$/
- master
frontend:
stage: deploy
......@@ -123,4 +123,4 @@ frontend:
environment:
name: staging
only:
- /^.*master$/
- master
......@@ -15,6 +15,7 @@
"""The real API, exposed as web pages or REST API."""
from datetime import datetime, timedelta
import logging
import re
......@@ -24,6 +25,7 @@ import zoe_lib.applications
import zoe_lib.exceptions
import zoe_lib.state
from zoe_lib.config import get_conf
from zoe_master.backends.swarm.api_client import SwarmClient
log = logging.getLogger(__name__)
......@@ -124,6 +126,18 @@ class APIEndpoint:
ret = [s for s in services if s.user_id == uid or role == 'admin']
return ret
def service_logs(self, uid, role, service_id, stream=True):
"""Retrieve the logs for the given service."""
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such service')
if service.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
if service.docker_id is None:
raise zoe_api.exceptions.ZoeNotFoundException('Container is not running')
swarm = SwarmClient()
return swarm.logs(service.docker_id, stream)
def statistics_scheduler(self, uid_, role_):
"""Retrieve statistics about the scheduler."""
success, message = self.master.scheduler_statistics()
......@@ -146,9 +160,33 @@ class APIEndpoint:
all_execs = self.sql.execution_list()
for execution in all_execs:
if execution.is_running:
terminated = False
for service in execution.services:
if service.description['monitor'] and service.is_dead():
log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
self.master.execution_terminate(execution.id)
terminated = True
break
if not terminated and execution.name == "aml-lab":
log.debug('Looking at AML execution {}...'.format(execution.id))
if datetime.now() - execution.time_start > timedelta(hours=get_conf().aml_ttl):
log.info('Terminating AML-LAB execution for user {}, timer expired'.format(execution.user_id))
self.master.execution_terminate(execution.id)
log.debug('Cleanup task finished')
def execution_endpoints(self, uid: str, role: str, execution: zoe_lib.state.sql_manager.Execution):
"""Return a list of the services and public endpoints available for a certain execution."""
services_info = []
endpoints = []
for service in execution.services:
services_info.append(self.service_by_id(uid, role, service.id))
port_mappings = service.ports
for port in service.description['ports']:
if 'expose' in port and port['expose']:
port_number = str(port['port_number']) + "/tcp"
if port_number in port_mappings:
endpoint = port['protocol'] + "://" + port_mappings[port_number][0] + ":" + port_mappings[port_number][1] + port['path']
endpoints.append((port['name'], endpoint))
return services_info, endpoints
......@@ -19,10 +19,10 @@ from typing import List
import tornado.web
from zoe_api.rest_api.execution import ExecutionAPI, ExecutionCollectionAPI, ExecutionDeleteAPI
from zoe_api.rest_api.execution import ExecutionAPI, ExecutionCollectionAPI, ExecutionDeleteAPI, ExecutionEndpointsAPI
from zoe_api.rest_api.info import InfoAPI
from zoe_api.rest_api.userinfo import UserInfoAPI
from zoe_api.rest_api.service import ServiceAPI
from zoe_api.rest_api.service import ServiceAPI, ServiceLogsAPI
from zoe_api.rest_api.discovery import DiscoveryAPI
from zoe_api.rest_api.statistics import SchedulerStatsAPI
from zoe_api.rest_api.oauth import OAuthGetAPI, OAuthRevokeAPI
......@@ -46,9 +46,11 @@ def api_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(API_PATH + r'/execution/([0-9]+)', ExecutionAPI, route_args),
tornado.web.url(API_PATH + r'/execution/delete/([0-9]+)', ExecutionDeleteAPI, route_args),
tornado.web.url(API_PATH + r'/execution/endpoints/([0-9]+)', ExecutionEndpointsAPI, route_args),
tornado.web.url(API_PATH + r'/execution', ExecutionCollectionAPI, route_args),
tornado.web.url(API_PATH + r'/service/([0-9]+)', ServiceAPI, route_args),
tornado.web.url(API_PATH + r'/service/logs/([0-9]+)', ServiceLogsAPI, route_args),
tornado.web.url(API_PATH + r'/discovery/by_group/([0-9]+)/([a-z0-9A-Z\-]+)', DiscoveryAPI, route_args),
......
......@@ -175,3 +175,39 @@ class ExecutionCollectionAPI(RequestHandler):
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ExecutionEndpointsAPI(RequestHandler):
"""The ExecutionEndpoints API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
@catch_exceptions
def get(self, execution_id: int):
"""
Get a list of execution endpoints.
:param execution_id: the execution to be deleted
"""
uid, role = get_auth(self)
execution = self.api_endpoint.execution_by_id(uid, role, execution_id)
services_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
self.write({'endpoints': endpoints})
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -19,6 +19,8 @@ from concurrent.futures import ThreadPoolExecutor
import logging
from tornado.web import RequestHandler
import tornado.gen
import tornado.iostream
from zoe_api.rest_api.utils import catch_exceptions, get_auth, manage_cors_headers
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
......@@ -57,3 +59,58 @@ class ServiceAPI(RequestHandler):
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
class ServiceLogsAPI(RequestHandler):
"""The Service logs API endpoint."""
def initialize(self, **kwargs):
"""Initializes the request handler."""
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
self.connection_closed = False
def set_default_headers(self):
"""Set up the headers for enabling CORS."""
manage_cors_headers(self)
@catch_exceptions
def options(self, service_id): # pylint: disable=unused-argument
"""Needed for CORS."""
self.set_status(204)
self.finish()
def on_connection_close(self):
"""Tornado callback for clients closing the connection."""
self.connection_closed = True
@catch_exceptions
@tornado.gen.coroutine
def get(self, service_id):
"""HTTP GET method."""
uid, role = get_auth(self)
log_gen = self.api_endpoint.service_logs(uid, role, service_id, stream=True)
while True:
try:
log_line = yield THREAD_POOL.submit(next, log_gen)
except StopIteration:
break
self.write(log_line)
try:
yield self.flush()
except tornado.iostream.StreamClosedError:
break
if self.connection_closed:
break
log.debug('Finished log stream for service {}'.format(service_id))
self.finish()
def data_received(self, chunk):
"""Not implemented as we do not use stream uploads"""
pass
......@@ -20,6 +20,7 @@ from typing import List
import tornado.web
import zoe_api.web.start
import zoe_api.web.ajax
import zoe_api.web.executions
from zoe_lib.version import ZOE_API_VERSION, ZOE_VERSION
......@@ -27,6 +28,7 @@ from zoe_lib.version import ZOE_API_VERSION, ZOE_VERSION
def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
"""Tornado init for the web interface."""
route_args = {
'api_endpoint': api_endpoint
}
......@@ -40,7 +42,9 @@ def web_init(api_endpoint) -> List[tornado.web.URLSpec]:
tornado.web.url(r'/executions/restart/([0-9]+)', zoe_api.web.executions.ExecutionRestartWeb, route_args, name='execution_restart'),
tornado.web.url(r'/executions/terminate/([0-9]+)', zoe_api.web.executions.ExecutionTerminateWeb, route_args, name='execution_terminate'),
tornado.web.url(r'/executions/delete/([0-9]+)', zoe_api.web.executions.ExecutionDeleteWeb, route_args, name='execution_delete'),
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, route_args, name='execution_inspect')
tornado.web.url(r'/executions/inspect/([0-9]+)', zoe_api.web.executions.ExecutionInspectWeb, route_args, name='execution_inspect'),
tornado.web.url(r'/ajax', zoe_api.web.ajax.AjaxEndpointWeb, route_args, name='ajax')
]
return web_routes
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Ajax API for the Zoe web interface."""
import datetime
import json
from tornado.escape import json_decode
from zoe_lib.config import get_conf
import zoe_api.exceptions
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler
class AjaxEndpointWeb(ZoeRequestHandler):
"""Handler class"""
def initialize(self, **kwargs):
"""Initializes the request handler."""
super().initialize(**kwargs)
self.api_endpoint = kwargs['api_endpoint'] # type: APIEndpoint
@catch_exceptions
def post(self):
"""AJAX POST requests."""
uid, role = get_auth(self)
request = json_decode(self.request.body)
if request['type'] == 'start':
app_descr = json.load(open('contrib/zoeapps/eurecom_aml_lab.json', 'r'))
execution = self.api_endpoint.execution_list(uid, role, name='aml-lab')
if len(execution) == 0:
exec_id = self.api_endpoint.execution_start(uid, role, 'aml-lab', app_descr)
pass
else:
execution = execution[0]
exec_id = execution.id
response = {
'status': 'ok',
'execution_id': exec_id
}
elif request['type'] == 'query_status':
try:
execution = self.api_endpoint.execution_by_id(uid, role, request['exec_id'])
except zoe_api.exceptions.ZoeNotFoundException:
response = {
'status': 'ok',
'exec_status': 'none'
}
else:
response = {
'status': 'ok',
'exec_status': execution.status
}
if execution.status == execution.RUNNING_STATUS:
response['ttl'] = ((execution.time_start + datetime.timedelta(hours=get_conf().aml_ttl)) - datetime.datetime.now()).total_seconds()
services_info_, endpoints = self.api_endpoint.execution_endpoints(uid, role, execution)
response['endpoints'] = endpoints
elif execution.status == execution.ERROR_STATUS or execution.status == execution.TERMINATED_STATUS:
self.api_endpoint.execution_delete(uid, role, execution.id)
else:
response = {
'status': 'error',
'message': 'unknown request type'
}
self.write(response)
......@@ -28,7 +28,6 @@ from tornado.escape import squeeze, linkify, url_escape, xhtml_escape
import tornado.web
import zoe_lib.version
import zoe_api.web.utils
......
......@@ -129,12 +129,11 @@ class ExecutionInspectWeb(ZoeRequestHandler):
e = self.api_endpoint.execution_by_id(uid, role, execution_id)
services_info = []
for service in e.services:
services_info.append(self.api_endpoint.service_by_id(uid, role, service.id))
services_info, endpoints = self.api_endpoint.execution_endpoints(uid, role, e)
template_vars = {
"e": e,
"endpoints": endpoints,
"services_info": services_info
}
self.render('execution_inspect.html', **template_vars)
......@@ -15,9 +15,6 @@
"""Main points of entry for the Zoe web interface."""
from random import randint
import json
from zoe_api.api_endpoint import APIEndpoint # pylint: disable=unused-import
from zoe_api.web.utils import get_auth_login, get_auth, catch_exceptions
from zoe_api.web.custom_request_handler import ZoeRequestHandler
......@@ -73,33 +70,20 @@ class HomeWeb(ZoeRequestHandler):
"""Home page with authentication."""
uid, role = get_auth(self)
if role == 'user' or role == 'admin':
executions = self.api_endpoint.execution_list(uid, role)
template_vars = {
'executions': sorted(executions, key=lambda e: e.id),
'is_admin': role == 'admin',
}
self.render('home_user.html', **template_vars)
else:
template_vars = {
'refresh': randint(2, 8),
'execution_status': 'Please wait...',
'execution_urls': [],
}
app_descr = json.load(open('contrib/zoeapps/eurecom_aml_lab.json', 'r'))
execution = self.api_endpoint.execution_list(uid, role, name='aml-lab')
if len(execution) == 0 or execution[0]['status'] == 'terminated' or execution[0]['status'] == 'finished':
self.api_endpoint.execution_start(uid, role, 'aml-lab', app_descr)
template_vars['execution_status'] = 'submitted'
return self.render('home_guest.html', **template_vars)
else:
execution = execution[0]
if execution['status'] != 'running':
template_vars['execution_status'] = execution['status']
return self.render('home_guest.html', **template_vars)
else:
template_vars['refresh'] = -1
template_vars['execution_status'] = execution['status']
return self.render('home_guest.html', **template_vars)
if role == 'guest':
return self._aml_homepage(uid)
executions = self.api_endpoint.execution_list(uid, role)
template_vars = {
'executions': sorted(executions, key=lambda e: e.id),
'is_admin': role == 'admin',
}
self.render('home_user.html', **template_vars)
def _aml_homepage(self, uid):
"""Home page for students of the AML course."""
template_vars = {
'uid': uid
}
return self.render('home_guest.html', **template_vars)
This diff is collapsed.
......@@ -62,84 +62,84 @@ span.fakelink {
}
#wrapper {
width: 800px;
width: 800px;
}
#navigation {
background-color: #fff;
border: #ddd 1px solid;
border-radius: 10px;
margin: 10px;
padding: 10px;
background-color: #fff;
border: #ddd 1px solid;
border-radius: 10px;
margin: 10px;
padding: 10px;
}
#navigation li {
margin: 2px 0;
margin: 2px 0;
}
label.error {
color: #ff0000;
margin-left: 10px;
position: relative;
color: #ff0000;
margin-left: 10px;
position: relative;
}
.wizard {
background-color: #fff;
border: #ddd 1px solid;
border-radius: 10px;
margin: 10px;
padding: 10px;
background-color: #fff;
border: #ddd 1px solid;
border-radius: 10px;
margin: 10px;
padding: 10px;
}
.wizard .wizard-header {
background-color: #f4f4f4;
border-bottom: #ddd 1px solid;
border-top-left-radius: 10px;
border-top-right-radius: 10px;
padding: 5px 10px;
margin: 0 0 10px 0;
background-color: #f4f4f4;
border-bottom: #ddd 1px solid;
border-top-left-radius: 10px;
border-top-right-radius: 10px;
padding: 5px 10px;
margin: 0 0 10px 0;
}
.wizard .wizard-step {
margin: 10px 0;
margin: 10px 0;
}
.wizard .wizard-step p {
padding: 5px;
padding: 5px;
}
.navigation {
border-top: #ddd 1px solid;
margin-top: 10px;
padding-top: 10px;
border-top: #ddd 1px solid;
margin-top: 10px;
padding-top: 10px;
}
.navigation ul {
margin: 0;
padding: 0;
list-style: none;
margin: 0;
padding: 0;
list-style: none;
}
.navigation li {
float: left;
margin-right: 10px;
float: left;
margin-right: 10px;
}
.clearfix:before, .clearfix:after {
content: "\0020";
display: block;
height: 0;
visibility: hidden;
content: "\0020";
display: block;
height: 0;
visibility: hidden;
}
.clearfix:after {
clear: both;
clear: both;
}
input {
margin-top: 5px;
margin-top: 5px;
}
section {
padding-bottom: 10px;
}
\ No newline at end of file
}
......@@ -23,6 +23,17 @@
<p>Error message: <code>{{ e.error_message }}</code></p>
{% endif %}
<div id="endpoints">
{% if endpoints|length > 0 %}
<h3>Endpoints:</h3>
{% endif %}
<ul>
{% for e in endpoints %}
<li><a href="{{ e[1] }}">{{ e[0] }}</a></li>
{% endfor %}
</ul>
</div>
<div id="container_list">
{% if services_info|length > 0 %}
<h3>Services:</h3>
......@@ -32,19 +43,10 @@
<li class="container_name" id="{{ s['id'] }}">{{ s['name'] }}</li>
<ul>
<li>Zoe status: {{ s['status'] }}</li>
<li>Docker status: {{ s['backend_status'] }}</li>
<li>Docker status: {{ s['docker_status'] }}</li>
{% if s['error_message'] is not none %}
<li>Error: {{ s['error_message'] }}</li>
{% endif %}
{% if s['backend_status'] == 'started' %}
{% for p in s['description']['ports'] %}
{% if s['proxy_address'] is not none %}
<li><a href="{{ p['protocol'] }}://{{ s['proxy_address'] }}/{{ p['port_number'] }}">{{ p['name'] }}</a></li>
{% else %}
<li><a> {{ p['name'] }} IP: {{ s['ip_address'] }}</a></li>
{% endif %}
{% endfor %}
{% endif %}
</ul>
{% endfor %}
</ul>
......
......@@ -3,44 +3,144 @@
{% block title %}Home{% endblock %}
{% block custom_head %}
{% if refresh > 0 %}
<meta http-equiv="refresh" content="{{ refresh }}">
{% endif %}
<script type="application/javascript">
const AJAX_URL = "{{ reverse_url('ajax') }}";
const SLOW_UPDATE = 60000;
const FAST_UPDATE = 1000;
let update_interval = null;
function ajax(data, success_cb) {
$.ajax({
url: AJAX_URL,
type: 'POST',
data: JSON.stringify(data),
contentType: 'application/json; charset=utf-8',
dataType: 'json',
async: true,
success: success_cb,
error: function () {
show_error('AJAX communication error, the operation will be retried');
}
});
}
function show_error(msg) {
let error_box = $("#ajax-error");
error_box.text("Error: " + msg);
error_box.show();
}
let state = "init";
let execution_id = -1;
function state_machine() {
if (state == "init") {
clearInterval(update_interval);
update_interval = setInterval(function(){update_zoe_status();}, FAST_UPDATE);
$("#state-init").show();
$("#state-starting").hide();
$("#state-started").hide();
ajax({'type': 'start'},
function (data) {
if (data['status'] == 'ok') {
$("#ajax-error").hide();
state = "starting";
execution_id = data['execution_id'];
} else {
show_error(data.message);
}
},
function () {
show_error('AJAX communication error, the operation will be retried');
}
);
} else if (state == "starting") {
$("#state-init").hide();
$("#state-starting").show();
$("#state-started").hide();
} else if (state == "started") {
clearInterval(update_interval);
update_interval = setInterval(function(){update_zoe_status();}, SLOW_UPDATE);
$("#state-init").hide();
$("#state-starting").hide();
$("#state-started").show();
}
}
function update_zoe_status() {
if (execution_id < 0) {
$("#zoe-status").text('off');
state = "init";
} else {
ajax({'type': 'query_status', 'exec_id': execution_id},
function (data) {
if (data['status'] == 'ok') {
$("#ajax-error").hide();
$("#zoe-status").text(data['exec_status']);
if (data['exec_status'] == 'running') {
state = "started";
$('#time_remaining').text(moment.duration(data['ttl'] * 1000).humanize());
let s = "";
for (let ep of data['endpoints']) {
s += "<li><a href=\"" + ep[1] + "\">" + ep[0] + "</a></li>\n";
}
$("#endpoints").html(s);
} else if (data['exec_status'] == 'terminated' || data['exec_status'] == 'none' || data['exec_status'] == 'error') {
state = "init";
update_interval = setInterval(function(){update_zoe_status();}, FAST_UPDATE);
}
} else {
show_error(data.message);
}
}
);
}
state_machine();
}
state_machine();
</script>
<style>
body {
width: 80%;
}
.state-box {
border: 1px solid black;
margin-top: 2em;
margin-bottom: 2em;
width: 40%;
padding-left: 10px;
padding-right: 10px;
}
</style>
{% endblock %}
{% block content %}
<h2>Guest cluster access page</h2>
<h2>Algorithmic Machine Learning cluster management</h2>
{% if request.remote_addr != gateway_ip %}
<p>You are logged in as {{ uid }}.</p>
<p>To access your Jupyter Notebook and your Spark cluster, you need to open a browser with a socks proxy configured. To do that, use the following commands.</p>
<p>Through this page you will be able to access the Jupyter notebook web interface, which you will use to upload and work on the notebooks provided on the <a href="https://github.com/DistributedSystemsGroup/Algorithmic-Machine-Learning">Algorithmic Machine Learning course GitHub page</a>.</p>
<p>On <b>Linux</b> copy and paste this command in a terminal window:</p>
<pre>google-chrome --proxy-server={{ user_gateway }} --user-data-dir=/tmp/chrome-zoe-$USER {{ request.url }}</pre>
<p>The work environment contains also an Apache Spark cluster and is created dynamically when you first access this page. After a fixed amount of time, the resources are freed and the Notebook and Spark are terminated. The files you saved in your workspace will be available for your next session.</p>
<p>On <b>Windows</b> copy and paste this command in a cmd window (Start & cmd):</p>
<pre>cd +chrome install directory+
chrome.exe --proxy-server={{ user_gateway }} --user-data-dir=%TEMP%\chrome-zoe {{ request.url }}</pre>
<span style="color: darkred; display: none;" id="ajax-error">AJAX communication error, retrying...</span>
{% else %}
<div class="state-box">