Commit 1d1a99eb authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Notebooks can be started and opened through the proxy

parent fcaed6f3
......@@ -15,5 +15,6 @@ conf = {
'email_task_interval': 300,
'client_rpyc_autodiscovery': True,
'client_rpyc_server': None,
'client_rpyc_port': None
'client_rpyc_port': None,
'proxy_path_prefix': '/proxy'
}
......@@ -64,7 +64,8 @@ class Execution(Base):
ret.time_finished = self.time_finished
ret.status = self.status
ret.termination_notice = self.termination_notice
ret.cluster_id = self.cluster.id
if self.cluster is not None:
ret.cluster_id = self.cluster.id
ret.type = self.type
return ret
......
......@@ -5,6 +5,7 @@ from common.state import AlchemySession
from common.state.application import Application, SparkNotebookApplication, SparkSubmitApplication, SparkApplication, PlainApplication
from common.state.container import Container
from common.state.execution import Execution, SparkSubmitExecution, PlainExecution
from common.state.proxy import Proxy
from common.state.user import User, PlainUser
from common.application_resources import SparkApplicationResources
from common.status import PlatformStatusReport, ApplicationStatusReport
......@@ -155,7 +156,7 @@ class ZoeClient:
storage.application_data_delete(application)
for e in application.executions:
self.execution_delete(e)
self.execution_delete(e.id)
self.state.delete(application)
self.state.commit()
......@@ -212,13 +213,31 @@ class ZoeClient:
return None
return ret.extract()
def execution_get_proxy_path(self, execution_id):
try:
execution = self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound:
return None
if execution is None:
return None
if isinstance(execution.application, SparkNotebookApplication):
c = execution.find_container("spark-notebook")
pr = self.state.query(Proxy).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
return conf['proxy_path_prefix'] + '/{}'.format(pr.id)
elif isinstance(execution.application, SparkSubmitApplication):
c = execution.find_container("spark-submit")
pr = self.state.query(Proxy).filter_by(container_id=c.id, service_name="Spark application web interface").one()
return conf['proxy_path_prefix'] + '/{}'.format(pr.id)
else:
return None
def execution_terminate(self, execution_id: int):
try:
self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound:
pass
self._connect()
self.server.terminate_execution(execution_id)
self.server.execution_terminate(execution_id)
self._close()
def execution_delete(self, execution_id: int):
......
......@@ -21,7 +21,10 @@ class PeriodicTaskManager:
def _generic_task(self, name, delay, func):
log.info("Task {} started".format(name))
while True:
func()
try:
func()
except:
log.exception("Task {} raised an exception".format(name))
stop = self.terminate.wait(delay)
if stop:
break
......
from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
from io import BytesIO
import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from common.state import AlchemySession, Application, Cluster, Container, SparkApplication, Proxy, Execution, SparkNotebookApplication, SparkSubmitApplication, SparkSubmitExecution
from common.state import AlchemySession, Cluster, Container, SparkApplication, Proxy, Execution, SparkNotebookApplication, SparkSubmitApplication, SparkSubmitExecution
from common.application_resources import ApplicationResources
from common.exceptions import CannotCreateCluster
from common.configuration import conf
from common.object_storage import logs_archive_upload
log = logging.getLogger(__name__)
class PlatformManager:
def __init__(self):
......@@ -28,6 +30,7 @@ class PlatformManager:
except CannotCreateCluster:
return False
execution.set_started()
print(execution.status)
state.commit()
pm.update_proxy()
return True
......@@ -96,14 +99,16 @@ class PlatformManager:
resources = execution.assigned_resources
nb_requirements = resources.notebook_resources
# Do this here so we can use the ID later for building proxy URLs
# Create this proxy entry here as we need to pass the ID in the container environment
container = Container(readable_name="spark-notebook", cluster=cluster)
state.add(container)
nb_url_proxy = Proxy(service_name="Spark Notebook interface", container=container, cluster=cluster)
state.add(nb_url_proxy)
state.flush()
nb_opts = ContainerOptions()
nb_opts.add_env_variable("SPARK_MASTER_IP", master.ip_address)
nb_opts.add_env_variable("PROXY_ID", container.id)
nb_opts.add_env_variable("PROXY_ID", nb_url_proxy.id)
if "memory_limit" in execution.assigned_resources.worker_resources:
nb_opts.add_env_variable("SPARK_EXECUTOR_RAM", execution.assigned_resources.worker_resources["memory_limit"])
if "memory_limit" in nb_requirements:
......@@ -120,9 +125,7 @@ class PlatformManager:
nb_app_url = "http://" + nb_info["ip_address"] + ":4040"
nb_app_proxy = Proxy(service_name="Spark application web interface", container=container, cluster=cluster, internal_url=nb_app_url)
state.add(nb_app_proxy)
nb_url = "http://" + nb_info["ip_address"] + ":9000/proxy/%d" % container.id
nb_url_proxy = Proxy(service_name="Spark Notebook interface", container=container, cluster=cluster, internal_url=nb_url)
state.add(nb_url_proxy)
nb_url_proxy.internal_url = "http://" + nb_info["ip_address"] + ":9000/proxy/%d" % nb_url_proxy.id
def _spawn_submit_client(self, state: AlchemySession, execution: SparkSubmitExecution, cluster: Cluster, master: Container):
application = execution.application
......@@ -159,7 +162,7 @@ class PlatformManager:
nb_app_proxy = Proxy(service_name="Spark application web interface", container=container, cluster=cluster, internal_url=nb_app_url)
state.add(nb_app_proxy)
def terminate_execution(self, state: AlchemySession, execution: Execution):
def execution_terminate(self, state: AlchemySession, execution: Execution):
cluster = execution.cluster
logs = []
if cluster is not None:
......@@ -191,7 +194,6 @@ class PlatformManager:
return ret["running"]
def check_executions_health(self):
log.debug("Running check health task")
state = AlchemySession()
all_containers = state.query(Container).all()
for c in all_containers:
......@@ -204,16 +206,19 @@ class PlatformManager:
for e in execs:
c = e.find_container("spark-notebook")
if c is not None:
pr = state.query(Proxy).filter_by(container_id=c.id, service_name="Spark Notebook interface")
pr = state.query(Proxy).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
if datetime.now() - pr.last_access > timedelta(hours=conf["notebook_max_age_no_activity"]):
self.terminate_execution(state, e)
log.info("Killing spark notebook {} for inactivity".format(e.id))
self.execution_terminate(state, e)
if datetime.now() - pr.last_access > timedelta(hours=conf["notebook_max_age_no_activity"]) - timedelta(hours=conf["notebook_warning_age_no_activity"]):
log.info("Spark notebook {} is on notice for inactivity".format(e.id))
e.termination_notice = True
state.commit()
def _container_died(self, state: AlchemySession, container: Container):
if container.readable_name == "spark-submit" or container.readable_name == "spark-master":
self.terminate_execution(state, container.cluster.execution)
log.debug("found a dead spark-submit container, cleaning up")
self.execution_terminate(state, container.cluster.execution)
else:
log.warning("Container {} (ID: {}) died unexpectedly")
......@@ -13,7 +13,6 @@ class PlatformStatus:
self.swarm = SwarmClient()
def update(self):
log.debug("Running platform status update task")
self.swarm_status = self.swarm.info()
def generate_report(self) -> PlatformStatusReport:
......
......@@ -20,7 +20,7 @@ ENTRY_TEMPLATE = """
ProxyHTMLExtended On
ProxyPass {{ proxy_url }} retry=1
ProxyPassReverse {{ proxy_url }}
{% if service_name != "notebook" %}
{% if service_name != "Spark Notebook interface" %}
ProxyHTMLURLMap ^/(.*)$ /proxy/{{ proxy_id }}/$1 RL
ProxyHTMLURLMap ^logPage(.*)$ /proxy/{{ proxy_id }}/logPage$1 RL
ProxyHTMLURLMap ^app(.*)$ /proxy/{{ proxy_id }}/app$1 RL
......@@ -29,7 +29,7 @@ ENTRY_TEMPLATE = """
{% endfor %}
{% endif %}
</Location>
{% if service_name == "notebook" %}
{% if service_name == "Spark Notebook interface" %}
<Location /proxy/{{ proxy_id }}/ws/>
ProxyPass ws://{{ netloc }}/proxy/{{ proxy_id }}/ws/
</Location>
......@@ -77,7 +77,6 @@ class ProxyManager:
self._commit_and_reload(output)
def update_proxy_access_timestamps(self):
log.debug("Running update proxy accesses task")
regex = re.compile('[0-9.]+ - - \[(.*)\] "GET /proxy/([0-9a-z\-]+)/')
logf = open(self.apache_access_log, 'r')
last_accesses = {}
......@@ -90,10 +89,10 @@ class ProxyManager:
state = AlchemySession()
for proxy in state.query(Proxy).all():
proxy_id = proxy['id']
if proxy_id in last_accesses:
proxy = state.query(Proxy).filter_by(id=proxy_id).one()
proxy.last_access = last_accesses[proxy_id]
if proxy.id in last_accesses:
log.debug("Updating access timestamp for proxy ID {}".format(proxy.id))
proxy = state.query(Proxy).filter_by(id=proxy.id).one()
proxy.last_access = last_accesses[proxy.id]
proxy.container.cluster.execution.termination_notice = False
state.commit()
......
......@@ -20,10 +20,10 @@ class ZoeSchedulerRPCService(rpyc.Service):
pl_status = self.sched.platform_status.generate_report()
return pl_status
def exposed_terminate_execution(self, execution_id: int) -> bool:
def exposed_execution_terminate(self, execution_id: int) -> bool:
state = AlchemySession()
execution = state.query(Execution).filter_by(id=execution_id).one()
self.sched.terminate_execution(state, execution)
self.sched.execution_terminate(state, execution)
state.commit()
return True
......
......@@ -75,23 +75,21 @@ class ZoeScheduler:
if not self.scheduler_policy.admission_control(execution.application.required_resources):
return False
self.scheduler_policy.insert(execution.id, execution.application.required_resources)
self._check_runnable()
return True
def _check_runnable(self): # called periodically, does not use state to keep database load low
execution_id, resources = self.scheduler_policy.runnable()
if execution_id is None:
return
log.debug("Found a runnable execution!")
if self.platform.start_execution(execution_id, resources):
self.scheduler_policy.started(execution_id, resources)
def schedule(self):
log.debug("Running schedule task")
self._check_runnable()
def terminate_execution(self, state, execution: Execution):
self.platform.terminate_execution(state, execution)
def execution_terminate(self, state, execution: Execution):
self.platform.execution_terminate(state, execution)
self.scheduler_policy.terminated(execution.id)
......
......@@ -61,6 +61,19 @@ def application_new():
return jsonify(status="ok")
@api_bp.route('/applications/delete/<app_id>', methods=['GET', 'POST'])
def application_delete(app_id):
client = get_zoe_client()
_api_check_user(client)
try:
client.application_remove(app_id)
except ApplicationStillRunning:
return jsonify(status="error", msg="The application has active executions and cannot be deleted")
else:
return jsonify(status="ok")
@api_bp.route('/executions/new', methods=['POST'])
def execution_new():
client = get_zoe_client()
......@@ -81,14 +94,11 @@ def execution_new():
return jsonify(status="error")
@api_bp.route('/applications/delete/<app_id>', methods=['GET', 'POST'])
def application_delete(app_id):
@api_bp.route('/executions/terminate/<exec_id>')
def execution_terminate(exec_id):
client = get_zoe_client()
_api_check_user(client)
try:
client.application_remove(app_id)
except ApplicationStillRunning:
return jsonify(status="error", msg="The application has active executions and cannot be deleted")
else:
return jsonify(status="ok")
client.execution_terminate(exec_id)
return jsonify(status="ok")
......@@ -93,7 +93,7 @@ def web_terminate(user_id, execution_id):
"execution_id": execution_id,
"user_id": user_id
}
return render_template('terminate.html', **template_vars)
return render_template('execution_terminate.html', **template_vars)
@app.route("/web/<int:user_id>/container/<int:container_id>/logs")
......
......@@ -25,10 +25,10 @@ def home():
past_executions = []
for r in reports:
for e in r.report['executions']:
if e['status'] == "running" or e['status'] == "scheduled":
active_executions.append((r, e))
if e['status'] == "running" or e['status'] == "scheduled" or e['status'] == "submitted":
active_executions.append((r.report, e, client.execution_get_proxy_path(e['id'])))
else:
past_executions.append((r, e))
past_executions.append((r.report, e))
template_vars['active_executions'] = active_executions
template_vars['past_executions'] = past_executions
return render_template('home.html', **template_vars)
......@@ -14,42 +14,21 @@ a:visited {
text-decoration: none;
}
div#activities {
padding-left: 3em;
}
a.activity {
width: 100px;
height: 80px;
border: 1px black solid;
border-radius: 15px;
float: left;
margin-right: 10px;
padding: 1em;
text-align: center;
display: block;
}
a.activity:hover {
transition: background-color 200ms linear;
background-color: khaki;
}
div.user_info {
font-size: smaller;
clear: both;
padding-top: 20px;
}
table#app_list {
table.app_list {
border-collapse: collapse;
}
table#app_list tr {
table.app_list tr {
border-top: 1px black solid;
}
table#app_list td {
table.app_list td {
padding-right: 1.5em;
}
......@@ -57,7 +36,7 @@ td.long-text {
font-size: xx-small;
}
div.caaas_status_line {
div.status_line {
float: left;
}
......
{% extends "base.html" %}
{% block footer %}
<p>Back to the <a href="/web/{{ user_id }}">user home page</a></p>
<p>Back to the <a href="{{ url_for('web.home') }}">user home page</a></p>
{{ super() }}
{% endblock %}
......@@ -33,7 +33,6 @@
You can monitor application status from the <a href="{{ url_for("web.home") }}">home page</a>.</p>
<p id="schedule_error" style="display: none">Your execution request has been denied. The system is not currently able to satisfy the
application resource requirements.</p>
You can monitor application status from the <a href="{{ url_for("web.home") }}">home page</a>.</p>
<p id="communication_error" style="display: none">Error: there is a communication problem with the Zoe web server. Is the network
still available?.</p>
......
{% extends "base_user.html" %}
{% block title %}Terminate cluster {{ cluster_name }}{% endblock %}
{% block title %}Terminate execution {{ execution.name }}{% endblock %}
{% block content %}
<script type="application/javascript">
(function() {
$.getJSON("{{ url_for("api_terminate_cluster", user_id=user_id, cluster_id=cluster_id) }}")
$.getJSON("{{ url_for("api.execution_terminate", exec_id=execution.id) }}")
.done(function( data ) {
if (data.status == "ok") {
$("#result").empty().append("Cluster terminated succesfully");
......@@ -13,7 +13,7 @@
});
})();
</script>
<h2>Termination request for cluster {{ cluster_name }}</h2>
<h2>Termination request for execution "{{ execution.name }}"</h2>
<p id="result">Request sent...</p>
......
......@@ -9,7 +9,7 @@
<p>You have no applications defined. <a href="{{ url_for("web.application_new") }}">Click here</a> to create a new one!</p>
{% else %}
<table id="app_list">
<table id="app_list" class="app_list">
<thead>
<tr>
<th>Name</th>
......@@ -44,7 +44,7 @@
<p>You have no active executions at this time.</p>
{% else %}
<table id="exec_list">
<table id="exec_list" class="app_list">
<thead>
<tr>
<th>Application name</th>
......@@ -62,8 +62,13 @@
<td>{{ e[1]['name'] }}</td>
<td>{{ e[1]['status'] }}</td>
<td>{{ e[1]['scheduled_at']|format_timestamp }}</td>
<td>{{ e[1]['started_at']|replace('None', 'not yet')|format_timestamp }}</td>
{% if e[1]['started_at'] == 'None' %}
<td>not yet</td>
{% else %}
<td>{{ e[1]['started_at']|format_timestamp }}</td>
{% endif %}
<td>{{ e[1]['cluster']|count }}</td>
<td><a href="{{ e[2] }}">Open</a></td>
<td><a href="{{ url_for('web.execution_terminate', exec_id=e[1]['id']) }}">Terminate</a></td>
</tr>
{% endfor %}
......@@ -78,7 +83,7 @@
<p>Empty history.</p>
{% else %}
<table id="hist_list">
<table id="hist_list" class="app_list">
<thead>
<tr>
<th>Application name</th>
......@@ -90,15 +95,21 @@
</tr>
</thead>
<tbody>
{% for e in active_executions %}
{% for e in past_executions %}
<tr class="{{ loop.cycle('odd', 'even') }}">
<td>{{ e[0]['name'] }}</td>
<td>{{ e[1]['name'] }}</td>
<td>{{ e[1]['status'] }}</td>
<td>{{ e[1]['scheduled_at']|format_timestamp }}</td>
{% if e[1]['started_at'] == 'None' %}
<td>never</td>
{% else %}
<td>{{ e[1]['started_at']|format_timestamp }}</td>
{% endif %}
<td>{{ e[1]['finished_at']|format_timestamp }}</td>
{% if e[1]['started_at'] != None %}
<td><a href="">Logs</a></td>
{% endif %}
</tr>
{% endfor %}
</tbody>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment