Commit 94e2b695 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Implement index, home and run pages

parent 434956e4
...@@ -3,7 +3,7 @@ conf = { ...@@ -3,7 +3,7 @@ conf = {
'status_refresh_interval': 10, 'status_refresh_interval': 10,
'scheduler_task_interval': 10, 'scheduler_task_interval': 10,
'db_connection': 'mysql+mysqlconnector://zoe:6sz2tfPuzBcCLdEz@m1.bigfoot.eurecom.fr/zoe', 'db_connection': 'mysql+mysqlconnector://zoe:6sz2tfPuzBcCLdEz@m1.bigfoot.eurecom.fr/zoe',
'redis_server': '192.168.45.25', 'redis_server': '192.168.45.2',
'redis_port': '6379', 'redis_port': '6379',
'redis_db': 0, 'redis_db': 0,
'apache-proxy-config-file': '/tmp/zoe-proxy.conf', 'apache-proxy-config-file': '/tmp/zoe-proxy.conf',
......
...@@ -22,12 +22,21 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoe/spark-notebook-1.4.1:1.2" ...@@ -22,12 +22,21 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoe/spark-notebook-1.4.1:1.2"
class ZoeClient: class ZoeClient:
def __init__(self, rpyc_server=None, rpyc_port=4000): def __init__(self, rpyc_server=None, rpyc_port=4000):
if rpyc_server is None: self.rpyc_server = rpyc_server
self.rpyc_port = rpyc_port
self.state = AlchemySession()
self.server = None
self.server_connection = None
def _connect(self):
if self.rpyc_server is None:
self.server_connection = rpyc.connect_by_service("ZoeSchedulerRPC") self.server_connection = rpyc.connect_by_service("ZoeSchedulerRPC")
else: else:
self.server_connection = rpyc.connect(rpyc_server, rpyc_port) self.server_connection = rpyc.connect(self.rpyc_server, self.rpyc_port)
self.server = self.server_connection.root self.server = self.server_connection.root
self.state = AlchemySession()
def _close(self):
return
# Users # Users
def user_new(self, email: str) -> PlainUser: def user_new(self, email: str) -> PlainUser:
...@@ -36,17 +45,31 @@ class ZoeClient: ...@@ -36,17 +45,31 @@ class ZoeClient:
self.state.commit() self.state.commit()
return user.extract() return user.extract()
def user_get(self, email: str) -> PlainUser: def user_get_by_email(self, email: str) -> PlainUser:
user = self.state.query(User).filter_by(email=email).one() try:
user = self.state.query(User).filter_by(email=email).one()
except NoResultFound:
return None
return user.extract() return user.extract()
def user_get(self, user_id: int) -> bool:
try:
user = self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
return None
else:
return user.extract()
def user_check(self, user_id: int) -> bool: def user_check(self, user_id: int) -> bool:
user = self.state.query(User).filter_by(id=user_id).one() user = self.state.query(User).filter_by(id=user_id).count()
return user is not None return user == 1
# Platform # Platform
def platform_status(self) -> PlatformStatusReport: def platform_status(self) -> PlatformStatusReport:
return self.server.get_platform_status() self._connect()
ret = self.server.get_platform_status()
self._close()
return ret
# Applications # Applications
def spark_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int: def spark_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
...@@ -142,7 +165,10 @@ class ZoeClient: ...@@ -142,7 +165,10 @@ class ZoeClient:
application = self.state.query(Application).filter_by(id=application_id).one() application = self.state.query(Application).filter_by(id=application_id).one()
except NoResultFound: except NoResultFound:
return None return None
return self.server.application_status(application.id) self._connect()
ret = self.server.application_status(application.id)
self._close()
return ret
def application_list(self, user_id) -> [PlainApplication]: def application_list(self, user_id) -> [PlainApplication]:
try: try:
...@@ -174,7 +200,10 @@ class ZoeClient: ...@@ -174,7 +200,10 @@ class ZoeClient:
status="submitted") status="submitted")
self.state.add(execution) self.state.add(execution)
self.state.commit() self.state.commit()
return self.server.execution_schedule(execution.id) self._connect()
ret = self.server.execution_schedule(execution.id)
self._close()
return ret
def execution_get(self, execution_id: int) -> PlainExecution: def execution_get(self, execution_id: int) -> PlainExecution:
try: try:
...@@ -188,7 +217,9 @@ class ZoeClient: ...@@ -188,7 +217,9 @@ class ZoeClient:
self.state.query(Execution).filter_by(id=execution_id).one() self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound: except NoResultFound:
pass pass
self._connect()
self.server.terminate_execution(execution_id) self.server.terminate_execution(execution_id)
self._close()
def execution_delete(self, execution_id: int): def execution_delete(self, execution_id: int):
try: try:
...@@ -208,7 +239,10 @@ class ZoeClient: ...@@ -208,7 +239,10 @@ class ZoeClient:
except NoResultFound: except NoResultFound:
return None return None
else: else:
return self.server.log_get(container_id) self._connect()
ret = self.server.log_get(container_id)
self._close()
return ret
def get_zoe_client(): def get_zoe_client():
......
...@@ -23,7 +23,10 @@ class PlatformManager: ...@@ -23,7 +23,10 @@ class PlatformManager:
state = AlchemySession() state = AlchemySession()
execution = state.query(Execution).filter_by(id=execution_id).one() execution = state.query(Execution).filter_by(id=execution_id).one()
execution.assigned_resources = resources execution.assigned_resources = resources
self._application_to_containers(state, execution) try:
self._application_to_containers(state, execution)
except CannotCreateCluster:
return False
execution.set_started() execution.set_started()
state.commit() state.commit()
pm.update_proxy() pm.update_proxy()
......
...@@ -54,6 +54,7 @@ class RPyCAsyncIOServer: ...@@ -54,6 +54,7 @@ class RPyCAsyncIOServer:
if registrar is None: if registrar is None:
registrar = UDPRegistryClient(logger = self.logger) registrar = UDPRegistryClient(logger = self.logger)
self.registrar = registrar self.registrar = registrar
self.register_task = None
# The asyncio Server object # The asyncio Server object
self.server = None self.server = None
...@@ -76,6 +77,7 @@ class RPyCAsyncIOServer: ...@@ -76,6 +77,7 @@ class RPyCAsyncIOServer:
"""returns the listener socket's file descriptor""" """returns the listener socket's file descriptor"""
return self.server.sockets[0] return self.server.sockets[0]
@asyncio.coroutine
def _accept_method(self, reader, writer): def _accept_method(self, reader, writer):
self._authenticate_and_serve_client(reader, writer) self._authenticate_and_serve_client(reader, writer)
......
...@@ -51,23 +51,25 @@ class SwarmClient: ...@@ -51,23 +51,25 @@ class SwarmClient:
return pl_status return pl_status
def spawn_container(self, image, options) -> dict: def spawn_container(self, image, options) -> dict:
host_config = docker.utils.create_host_config(network_mode="bridge", cont = None
binds=options.get_volume_binds(),
mem_limit=options.get_memory_limit())
cont = self.cli.create_container(image=image,
environment=options.get_environment(),
network_disabled=False,
host_config=host_config,
detach=True,
volumes=options.get_volumes(),
command=options.get_command())
try: try:
host_config = docker.utils.create_host_config(network_mode="bridge",
binds=options.get_volume_binds(),
mem_limit=options.get_memory_limit())
cont = self.cli.create_container(image=image,
environment=options.get_environment(),
network_disabled=False,
host_config=host_config,
detach=True,
volumes=options.get_volumes(),
command=options.get_command())
self.cli.start(container=cont.get('Id')) self.cli.start(container=cont.get('Id'))
info = self.inspect_container(cont.get('Id'))
except docker.errors.APIError as e: except docker.errors.APIError as e:
self.cli.remove_container(container=cont.get('Id'), force=True) if cont is not None:
self.cli.remove_container(container=cont.get('Id'), force=True)
log.error(str(e)) log.error(str(e))
return None return None
info = self.inspect_container(cont.get('Id'))
return info return info
def inspect_container(self, docker_id) -> dict: def inspect_container(self, docker_id) -> dict:
......
from flask import Flask, url_for, abort from datetime import datetime
from flask import Flask, url_for
from zoe_web.api import api_bp from zoe_web.api import api_bp
from zoe_web.web import web_bp from zoe_web.web import web_bp
......
from flask import Blueprint, abort, jsonify from zipfile import is_zipfile
api_bp = Blueprint('api', __name__)
from flask import Blueprint, jsonify, request, session
from zoe_client import get_zoe_client from zoe_client import get_zoe_client
from common.exceptions import ApplicationStillRunning
api_bp = Blueprint('api', __name__)
def _api_check_user(zoe_client):
if 'user_id' not in session:
return jsonify(status='error', msg='user not logged in')
user = zoe_client.user_get(session['user_id'])
if user is None:
return jsonify(status='error', msg='unknown user')
else:
return user
@api_bp.route('/status/basic') @api_bp.route('/status/basic')
...@@ -13,3 +27,68 @@ def status_basic(): ...@@ -13,3 +27,68 @@ def status_basic():
'num_containers': platform_report.report["swarm"]["container_count"] 'num_containers': platform_report.report["swarm"]["container_count"]
} }
return jsonify(**ret) return jsonify(**ret)
@api_bp.route('/login', methods=['POST'])
def login():
form_data = request.form
email = form_data["email"]
client = get_zoe_client()
user = client.user_get(email)
if user is None:
user = client.user_new(email)
session["user_id"] = user.id
return jsonify(status="ok")
@api_bp.route('/applications/new', methods=['POST'])
def application_new():
client = get_zoe_client()
user = _api_check_user(client)
form_data = request.form
if form_data['app_type'] == "spark-notebook":
client.spark_notebook_application_new(user.id, int(form_data["num_workers"]), form_data["ram"] + 'g', int(form_data["num_cores"]), form_data["app_name"])
elif form_data['app_type'] == "spark-submit":
file_data = request.files['file']
if not is_zipfile(file_data.stream):
return jsonify(status='error', msg='not a zip file')
client.spark_submit_application_new(user.id, int(form_data["num_workers"]), form_data["ram"] + 'g', int(form_data["num_cores"]), form_data["app_name"], file_data)
else:
return jsonify(status="error", msg='unknown application type')
return jsonify(status="ok")
@api_bp.route('/executions/new', methods=['POST'])
def execution_new():
client = get_zoe_client()
_api_check_user(client)
form_data = request.form
app_id = int(form_data["app_id"])
application = client.application_get(app_id)
if application.type == "spark-notebook":
ret = client.execution_spark_new(app_id, form_data["exec_name"])
else:
ret = client.execution_spark_new(app_id, form_data["exec_name"], form_data["commandline"], form_data["spark_opts"])
if ret:
return jsonify(status="ok")
else:
return jsonify(status="error")
@api_bp.route('/applications/delete/<app_id>', methods=['GET', 'POST'])
def application_delete(app_id):
client = get_zoe_client()
_api_check_user(client)
try:
client.application_remove(app_id)
except ApplicationStillRunning:
return jsonify(status="error", msg="The application has active executions and cannot be deleted")
else:
return jsonify(status="ok")
...@@ -2,9 +2,6 @@ from flask import jsonify, request, send_file, abort, Blueprint ...@@ -2,9 +2,6 @@ from flask import jsonify, request, send_file, abort, Blueprint
from zipfile import is_zipfile from zipfile import is_zipfile
zoeweb_api = Blueprint('zoeweb_api', __name__)
@app.route("/api/<int:user_id>/cluster/<int:cluster_id>/terminate") @app.route("/api/<int:user_id>/cluster/<int:cluster_id>/terminate")
def api_terminate_cluster(user_id, cluster_id): def api_terminate_cluster(user_id, cluster_id):
db = CAaaState() db = CAaaState()
...@@ -47,26 +44,6 @@ def api_container_logs(user_id, container_id): ...@@ -47,26 +44,6 @@ def api_container_logs(user_id, container_id):
return jsonify(**ret) return jsonify(**ret)
@app.route("/api/<int:user_id>/spark-submit", methods=['POST'])
def api_spark_submit(user_id):
state = CAaaState()
ret = {}
if not state.check_user_id(user_id):
ret["status"] = "unauthorized"
return jsonify(**ret)
file_data = request.files['file']
form_data = request.form
if not is_zipfile(file_data.stream):
ret["status"] = "not a zip file"
return jsonify(**ret)
app_id = application_submitted(user_id, form_data["exec_name"], form_data["spark_options"], form_data["cmd_line"], file_data)
setup_volume(user_id, app_id, file_data.stream)
sm.spark_submit(user_id, app_id)
ret["status"] = "ok"
return jsonify(**ret)
@app.route("/api/<int:user_id>/history/<app_id>/logs") @app.route("/api/<int:user_id>/history/<app_id>/logs")
def api_history_log_archive(user_id, app_id): def api_history_log_archive(user_id, app_id):
state = CAaaState() state = CAaaState()
......
...@@ -6,5 +6,8 @@ from zoe_client import ZoeClient ...@@ -6,5 +6,8 @@ from zoe_client import ZoeClient
def check_user(zoeclient: ZoeClient): def check_user(zoeclient: ZoeClient):
if 'user_id' not in session: if 'user_id' not in session:
return redirect(url_for('web_bp.index')) return redirect(url_for('web_bp.index'))
if not zoeclient.user_check(session['user_id']): user = zoeclient.user_get(session['user_id'])
if user is None:
return redirect(url_for('web_bp.index')) return redirect(url_for('web_bp.index'))
else:
return user
from datetime import datetime
from flask import Blueprint from flask import Blueprint
web_bp = Blueprint('web', __name__, template_folder='templates', static_folder='static') web_bp = Blueprint('web', __name__, template_folder='templates', static_folder='static')
import zoe_web.web.start import zoe_web.web.start
import zoe_web.web.status import zoe_web.web.status
import zoe_web.web.applications
@web_bp.app_template_filter('format_timestamp')
def _jinja2_filter_datetime(timestamp):
try:
dt = datetime.fromtimestamp(timestamp)
except TypeError:
return timestamp
return dt.ctime()
from flask import render_template
from zoe_client import get_zoe_client
from zoe_web.web import web_bp
import zoe_web.utils as web_utils
@web_bp.route('/apps/new')
def application_new():
client = get_zoe_client()
user = web_utils.check_user(client)
template_vars = {
"user_id": user.id,
"email": user.email,
}
return render_template('application_new.html', **template_vars)
@web_bp.route('/executions/new/<app_id>')
def execution_new(app_id):
client = get_zoe_client()
user = web_utils.check_user(client)
application = client.application_get(app_id)
template_vars = {
"user_id": user.id,
"email": user.email,
'app': application
}
return render_template('execution_new.html', **template_vars)
@web_bp.route('/executions/terminate/<exec_id>')
def execution_terminate(exec_id):
client = get_zoe_client()
user = web_utils.check_user(client)
execution = client.execution_get(exec_id)
template_vars = {
"user_id": user.id,
"email": user.email,
'execution': execution
}
return render_template('execution_terminate.html', **template_vars)
@web_bp.route('/apps/delete/<app_id>')
def application_delete(app_id):
client = get_zoe_client()
user = web_utils.check_user(client)
application = client.application_get(app_id)
template_vars = {
"user_id": user.id,
"email": user.email,
'app': application
}
return render_template('application_delete.html', **template_vars)
...@@ -14,8 +14,21 @@ def index(): ...@@ -14,8 +14,21 @@ def index():
def home(): def home():
client = get_zoe_client() client = get_zoe_client()
user = web_utils.check_user(client) user = web_utils.check_user(client)
apps = client.application_list(user.id)
template_vars = { template_vars = {
"user_id": user.id, "user_id": user.id,
"email": user.email "email": user.email,
'apps': apps,
} }
reports = [client.application_status(app.id) for app in apps]
active_executions = []
past_executions = []
for r in reports:
for e in r.report['executions']:
if e['status'] == "running" or e['status'] == "scheduled":
active_executions.append((r, e))
else:
past_executions.append((r, e))
template_vars['active_executions'] = active_executions
template_vars['past_executions'] = past_executions
return render_template('home.html', **template_vars) return render_template('home.html', **template_vars)
{% extends "base.html" %}
{% block title %}New application{% endblock %}
{% block content %}
<h1>Delete application</h1>
<p>Warning the application binary and all historic data and logs will be deleted for application {{ app.name }}.</p>
<form enctype="multipart/form-data" id="app_del">
<input type="submit" value="Yes, delete the application" id="submit">
</form>
<script type="application/javascript">
function completeHandler(e) {
if (e.status == "ok") {
location.href = "{{ url_for('web.home') }}";
} else {
alert("Error: " + e.msg);
}
return false;
}
function errorHandler(e) {
alert(e.msg)
}
$('#app_del').on('submit', function(event) {
event.preventDefault();
$("#progress").show();
$.ajax({
url: '{{ url_for("api.application_delete", app_id=app.id) }}', // Server script to process data
type: 'POST',
success: completeHandler,
error: errorHandler,
// Form data
data: new FormData(this),
//Options to tell jQuery not to process data or worry about content-type.
cache: false,
contentType: false,
processData: false
});
return false;
});
</script>
{% endblock %}
\ No newline at end of file
{% extends "base.html" %}
{% block title %}New application{% endblock %}
{% block content %}
<h1>New application</h1>
<p>Use the form below to create a new Zoe application.</p>
<form enctype="multipart/form-data" id="app_new">
<label for="app_name">Name:</label>
<input type="text" autofocus autocomplete="on" required pattern="[a-z0-9_\-]+" name="app_name" id="app_name" placeholder="myapp-25"><br/>
<label for="app_type">Type:</label><br/>
<input type="radio" name="app_type" value="spark-notebook" onclick="handleAppTypeClick(this)" checked>&nbsp;Spark Notebook<br/>
<input type="radio" name="app_type" value="ipython-notebook" onclick="handleAppTypeClick(this)" disabled>&nbsp;iPython Notebook (coming soon)<br/>
<input type="radio" name="app_type" value="spark-submit" onclick="handleAppTypeClick(this)">&nbsp;Spark Application<br/>
<p>Minimum required resources:</p>
<label for="num_workers">Number of worker nodes</label>
<input type="number" min="1" step="1" value="2" required name="num_workers"><br/>
<label for="num_cores">Number of cores per worker node</label>
<input type="number" min="2" step="1" value="2" required name="num_cores"><br/>
<label for="ram">Amount of RAM per worker node (gigabytes)</label>
<input type="number" min="1" step="1" value="2" required name="ram"><br/>
<label for="upload" class="spark-submit">Zip file containing the Spark application:</label>