Commit e988554e authored by Daniele Venzano's avatar Daniele Venzano

Fix a few bugs and add some css to the apps page

parent f3097356
This diff is collapsed.
from flask import jsonify, request, send_file from flask import jsonify, request, send_file, abort
import time import time
from zipfile import is_zipfile from zipfile import is_zipfile
...@@ -38,7 +38,10 @@ def api_terminate_cluster(user_id, cluster_id): ...@@ -38,7 +38,10 @@ def api_terminate_cluster(user_id, cluster_id):
ret["status"] = "unauthorized" ret["status"] = "unauthorized"
return jsonify(**ret) return jsonify(**ret)
cluster_id = str(cluster_id)
cluster_list = db.get_clusters(user_id) cluster_list = db.get_clusters(user_id)
if cluster_id not in cluster_list:
return abort(404)
if cluster_list[cluster_id]["user_id"] != user_id: if cluster_list[cluster_id]["user_id"] != user_id:
ret["status"] = "unauthorized" ret["status"] = "unauthorized"
return jsonify(**ret) return jsonify(**ret)
......
from datetime import datetime, timedelta from datetime import datetime, timedelta
import threading
import time import time
from traceback import print_exc from traceback import print_exc
import smtplib import smtplib
from email.mime.text import MIMEText from email.mime.text import MIMEText
import logging
log = logging.getLogger(__name__)
from jinja2 import Template from jinja2 import Template
...@@ -51,21 +52,15 @@ def do_duration(seconds): ...@@ -51,21 +52,15 @@ def do_duration(seconds):
return template.format(d=d, h=h, m=m, s=s) return template.format(d=d, h=h, m=m, s=s)
def start_cleanup_thread(): def cleanup_task():
th = threading.Thread(target=_loop) ts = time.time()
th.daemon = True # noinspection PyBroadException
th.start() try:
clean_completed_apps()
check_notebooks()
def _loop(): except:
while True: print_exc()
# noinspection PyBroadException log.debug("Cleanup task completed in {:.3}s".format(time.time() - ts))
try:
clean_completed_apps()
check_notebooks()
time.sleep(int(config.cleanup_thread_interval))
except:
print_exc()
def check_notebooks(): def check_notebooks():
...@@ -101,15 +96,15 @@ def app_cleanup(app_id, cluster_id): ...@@ -101,15 +96,15 @@ def app_cleanup(app_id, cluster_id):
state = CAaaState() state = CAaaState()
app = state.get_application(app_id) app = state.get_application(app_id)
username = state.get_user_email(app["user_id"]) email = state.get_user_email(app["user_id"])
template_vars = { template_vars = {
'cmdline': app["cmd"], 'cmdline': app["cmd"],
'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()), 'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()),
'name': app["execution_name"], 'name': app["execution_name"],
'log_url': config.flask_base_url + "/api/{}/history/{}/logs".format(username, app_id) 'log_url': config.flask_base_url + "/api/{}/history/{}/logs".format(app["user_id"], app_id)
} }
subject = '[CAaaS] Spark execution {} finished'.format(app["execution_name"]) subject = '[CAaaS] Spark execution {} finished'.format(app["execution_name"])
send_email(username, subject, APP_FINISH_EMAIL_TEMPLATE, template_vars) send_email(email, subject, APP_FINISH_EMAIL_TEMPLATE, template_vars)
sm.terminate_cluster(cluster_id) sm.terminate_cluster(cluster_id)
...@@ -134,7 +129,7 @@ def clean_completed_apps(): ...@@ -134,7 +129,7 @@ def clean_completed_apps():
cont_ids = state.get_submit_containers() cont_ids = state.get_submit_containers()
for cont_id, cluster_id in cont_ids: for cont_id, cluster_id in cont_ids:
if not sm.check_container_alive(cont_id): if not sm.check_container_alive(cont_id):
print("Found an app to cleanup")
app_id = state.find_app_for_cluster(cluster_id) app_id = state.find_app_for_cluster(cluster_id)
log.info("App {} needs to be cleaned up".format(app_id))
state.application_finished(app_id) state.application_finished(app_id)
app_cleanup(app_id, cluster_id) app_cleanup(app_id, cluster_id)
...@@ -3,7 +3,10 @@ from caaas.config_parser import config ...@@ -3,7 +3,10 @@ from caaas.config_parser import config
def _generate_proxied_url(proxy_id): def _generate_proxied_url(proxy_id):
return config.proxy_base_url + "/" + proxy_id if proxy_id is not None:
return config.proxy_base_url + "/" + proxy_id
else:
return None
def get_container_addresses(container_id): def get_container_addresses(container_id):
......
...@@ -202,7 +202,7 @@ class CAaaState: ...@@ -202,7 +202,7 @@ class CAaaState:
q = "SELECT id, user_id, master_address, name FROM clusters WHERE user_id=%s" q = "SELECT id, user_id, master_address, name FROM clusters WHERE user_id=%s"
cursor.execute(q, (user_id,)) cursor.execute(q, (user_id,))
for row in cursor: for row in cursor:
res[str(row["id"])] = { res[str(row["id"])] = { # FIXME: IDs should be int or str, no casting!
"user_id": row["user_id"], "user_id": row["user_id"],
"master_address": row["master_address"], "master_address": row["master_address"],
"name": row["name"] "name": row["name"]
...@@ -354,10 +354,10 @@ class CAaaState: ...@@ -354,10 +354,10 @@ class CAaaState:
def get_applications(self, user_id=None) -> dict: def get_applications(self, user_id=None) -> dict:
cursor = self._get_cursor(dictionary=True) cursor = self._get_cursor(dictionary=True)
if user_id is None: if user_id is None:
q = "SELECT * FROM applications" q = "SELECT * FROM applications ORDER BY time_started DESC"
cursor.execute(q) cursor.execute(q)
else: else:
q = "SELECT * FROM applications WHERE user_id=%s" q = "SELECT * FROM applications WHERE user_id=%s ORDER BY time_started DESC"
cursor.execute(q, (user_id,)) cursor.execute(q, (user_id,))
res = [] res = []
for row in cursor: for row in cursor:
......
...@@ -41,6 +41,22 @@ div.user_info { ...@@ -41,6 +41,22 @@ div.user_info {
padding-top: 20px; padding-top: 20px;
} }
table#app_list {
border-collapse: collapse;
}
table#app_list tr {
border-top: 1px black solid;
}
table#app_list td {
padding-right: 1.5em;
}
td.long-text {
font-size: xx-small;
}
div.caaas_status_line { div.caaas_status_line {
float: left; float: left;
} }
......
...@@ -26,11 +26,12 @@ ...@@ -26,11 +26,12 @@
<th>Logs</th> <th>Logs</th>
</tr> </tr>
</thead> </thead>
<tbody>
{% for a in apps %} {% for a in apps %}
<tr> <tr class="{{ loop.cycle('odd', 'even') }}">
<td>{{ a["execution_name"] }}</td> <td>{{ a["execution_name"] }}</td>
<td>{{ a["cmd"] }}</td> <td class="long-text">{{ a["cmd"] }}</td>
<td>{{ a["spark_options"] }}</td> <td class="long-text">{{ a["spark_options"] }}</td>
<td>{{ a["time_started"] }}</td> <td>{{ a["time_started"] }}</td>
<td>{{ a["time_finished"] }}</td> <td>{{ a["time_finished"] }}</td>
<td>{{ a["status"] }}</td> <td>{{ a["status"] }}</td>
...@@ -39,16 +40,17 @@ ...@@ -39,16 +40,17 @@
<a href="{{ url_for("api_history_log_archive", user_id=user_id, app_id=a["id"]) }}">Zip</a> <a href="{{ url_for("api_history_log_archive", user_id=user_id, app_id=a["id"]) }}">Zip</a>
{% endif %} {% endif %}
{% if a["status"] == "running" %} {% if a["status"] == "running" %}
<a href="{{ url_for("web_inspect", user_id=user_id, app_id=a["cluster_id"]) }}">Inspect</a> <a href="{{ url_for("web_inspect", user_id=user_id, cluster_id=a["cluster_id"]) }}">Inspect</a>
{% endif %} {% endif %}
</td> </td>
<td> <td>
{% if a["status"] == "running" %} {% if a["status"] == "running" %}
{{ url_for("web_terminate", user_id=user_id, cluster_id=a["cluster_id"]) }} <a href="{{ url_for("web_terminate", user_id=user_id, cluster_id=a["cluster_id"]) }}">Terminate</a>
{% endif %} {% endif %}
</td> </td>
</tr> </tr>
<!-- FIXME: add a re-run link --> <!-- FIXME: add a re-run link -->
{% endfor %} {% endfor %}
</tbody>
</table> </table>
{% endblock %} {% endblock %}
\ No newline at end of file
...@@ -29,7 +29,8 @@ ...@@ -29,7 +29,8 @@
} }
function completeHandler(e) { function completeHandler(e) {
$("#progress").hide(); location.href = "http://" + window.location.hostname + "/web/{{ user_id }}/apps";
return false;
} }
function errorHandler(e) { function errorHandler(e) {
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
{% block content %} {% block content %}
<script type="application/javascript"> <script type="application/javascript">
(function() { (function() {
$.getJSON("{{ url_for("api_terminate_cluster", user_id, cluster_id) }}") $.getJSON("{{ url_for("api_terminate_cluster", user_id=user_id, cluster_id=cluster_id) }}")
.done(function( data ) { .done(function( data ) {
if (data.status == "ok") { if (data.status == "ok") {
$("#result").empty().append("Cluster terminated succesfully"); $("#result").empty().append("Cluster terminated succesfully");
......
...@@ -2,7 +2,7 @@ from flask import render_template, redirect, url_for, abort ...@@ -2,7 +2,7 @@ from flask import render_template, redirect, url_for, abort
from caaas import app from caaas import app
from caaas.config_parser import config from caaas.config_parser import config
from caaas.proxy_manager import get_container_addresses from caaas.proxy_manager import get_container_addresses, get_notebook_address
from caaas.sql import CAaaState from caaas.sql import CAaaState
from caaas.swarm_manager import sm from caaas.swarm_manager import sm
...@@ -46,12 +46,13 @@ def web_user_apps(user_id): ...@@ -46,12 +46,13 @@ def web_user_apps(user_id):
return redirect(url_for('index')) return redirect(url_for('index'))
apps = state.get_applications(user_id) apps = state.get_applications(user_id)
nb_id = state.get_notebook(user_id)
template_vars = { template_vars = {
"user_id": user_id, "user_id": user_id,
"apps": apps, "apps": apps,
"has_notebook": state.has_notebook(user_id), "has_notebook": state.has_notebook(user_id),
"notebook_address": sm.get_notebook(user_id), "notebook_address": get_notebook_address(nb_id),
"notebook_cluster_id": state.get_notebook(user_id) "notebook_cluster_id": nb_id
} }
return render_template('apps.html', **template_vars) return render_template('apps.html', **template_vars)
......
import logging
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from caaas import app from caaas import app
from caaas.cleanup_thread import start_cleanup_thread from caaas.cleanup_thread import cleanup_task
from caaas.config_parser import config
DEBUG = True
log = logging.getLogger("caaas")
def main(): def main():
start_cleanup_thread() if DEBUG:
app.debug = True logging.basicConfig(level=logging.DEBUG)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("tornado").setLevel(logging.WARNING)
print("Starting app...")
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
app.run(host="0.0.0.0")
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(5000, "0.0.0.0")
ioloop = IOLoop.instance()
PeriodicCallback(cleanup_task, int(config.cleanup_thread_interval) * 1000).start()
ioloop.start()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment