Commit 7aa1b732 authored by Daniele Venzano's avatar Daniele Venzano

Cleanups and refactoring

parent dd9226c8
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
<column name="cluster_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/> <column name="cluster_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/>
<column name="service_name" sqlType="VARCHAR" precision="64" scale="0" nullable="true" jdbcType="12"/> <column name="service_name" sqlType="VARCHAR" precision="64" scale="0" nullable="true" jdbcType="12"/>
<column name="container_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/> <column name="container_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/>
<column name="last_access" sqlType="TIMESTAMP" precision="19" scale="0" nullable="true" jdbcType="93"/>
<primary-key name="PRIMARY" columns="id"/> <primary-key name="PRIMARY" columns="id"/>
</table> </table>
<table name="users" schema="" catalog="caaas" type="TABLE"> <table name="users" schema="" catalog="caaas" type="TABLE">
......
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="DataSourceManagerImpl" format="xml" hash="730622038"> <component name="DataSourceManagerImpl" format="xml" hash="779648770">
<data-source source="LOCAL" name="MySQL - @m1" uuid="a32fd6de-3ffa-40c0-9ec8-8953a89c53e0"> <data-source source="LOCAL" name="MySQL - @m1" uuid="a32fd6de-3ffa-40c0-9ec8-8953a89c53e0">
<driver-ref>mysql</driver-ref> <driver-ref>mysql</driver-ref>
<synchronize>true</synchronize> <synchronize>true</synchronize>
......
<component name="InspectionProjectProfileManager"> <component name="InspectionProjectProfileManager">
<profile version="1.0"> <profile version="1.0">
<option name="myName" value="Project Default" /> <option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="1">
<item index="0" class="java.lang.String" itemvalue="mysql" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="SpellCheckingInspection" enabled="true" level="TYPO" enabled_by_default="true"> <inspection_tool class="SpellCheckingInspection" enabled="true" level="TYPO" enabled_by_default="true">
<option name="processCode" value="false" /> <option name="processCode" value="false" />
<option name="processLiterals" value="true" /> <option name="processLiterals" value="true" />
......
This diff is collapsed.
from jinja2 import Template
from hashlib import md5 from hashlib import md5
from os import system from os import system
from time import sleep from time import sleep
from urllib.parse import urlparse from urllib.parse import urlparse
import re
from datetime import datetime
from jinja2 import Template
from caaas import CAaaState from caaas.sql import CAaaState
from caaas.config_parser import config
OUTFILE = "/tmp/caaas-proxy.conf" LOOP_INTERVAL = 1 # seconds
ACCESS_TIME_REFRESH_INTERVAL = 60 # seconds
ENTRY_TEMPLATE = """ ENTRY_TEMPLATE = """
# CAaaS proxy entry for service {{ service_name }} # CAaaS proxy entry for service {{ service_name }}
...@@ -63,7 +68,7 @@ def check_difference(generated_file): ...@@ -63,7 +68,7 @@ def check_difference(generated_file):
m_new.update(generated_file.encode('ascii')) m_new.update(generated_file.encode('ascii'))
m_old = md5() m_old = md5()
try: try:
m_old.update(open(OUTFILE).read().encode('ascii')) m_old.update(open(config.proxy_apache_config).read().encode('ascii'))
except FileNotFoundError: except FileNotFoundError:
return True return True
return m_new.digest() != m_old.digest() return m_new.digest() != m_old.digest()
...@@ -71,7 +76,7 @@ def check_difference(generated_file): ...@@ -71,7 +76,7 @@ def check_difference(generated_file):
def commit_and_reload(generated_file): def commit_and_reload(generated_file):
print("Apache config requires an update, committing and reloading") print("Apache config requires an update, committing and reloading")
open(OUTFILE, "w").write(generated_file) open(config.proxy_apache_config, "w").write(generated_file)
system("sudo service apache2 reload") system("sudo service apache2 reload")
...@@ -82,10 +87,33 @@ def update_proxy(): ...@@ -82,10 +87,33 @@ def update_proxy():
commit_and_reload(output) commit_and_reload(output)
def update_proxy_access_timestamps():
regex = re.compile('[0-9.]+ - - \[(.*)\] "GET /proxy/([0-9a-z\-]+)/')
log = open(config.proxy_apache_access_log, 'r')
last_accesses = {}
for line in log:
match = re.match(regex, line)
if match is not None:
proxy_id = match.group(2)
timestamp = datetime.strptime(match.group(1), "%d/%b/%Y:%H:%M:%S %z")
last_accesses[proxy_id] = timestamp
state = CAaaState()
for proxy in state.get_proxies():
proxy_id = proxy['proxy_id']
if proxy_id in last_accesses:
state.update_proxy_access(proxy_id, last_accesses[proxy_id])
if __name__ == "__main__": if __name__ == "__main__":
print("CAaaS Apache proxy synchronizer starting") print("CAaaS Apache proxy synchronization starting")
access_time_refresh_delay = ACCESS_TIME_REFRESH_INTERVAL
while True: while True:
# print("Checking proxy entries...") # print("Checking proxy entries...")
update_proxy() update_proxy()
# print("Checking for completed applications to clean up") # print("Checking for completed applications to clean up")
sleep(1) sleep(LOOP_INTERVAL)
access_time_refresh_delay -= LOOP_INTERVAL
if access_time_refresh_delay <= 0:
update_proxy_access_timestamps()
access_time_refresh_delay = ACCESS_TIME_REFRESH_INTERVAL
...@@ -10,12 +10,16 @@ db = caaas ...@@ -10,12 +10,16 @@ db = caaas
[proxy] [proxy]
base_url = http://some-host/some-path base_url = http://some-host/some-path
apache_config = /tmp/caaas-proxy.conf
apache_access_log = /var/log/apache2/access.log
[caaas] [caaas]
history_per_user_count = 20 history_per_user_count = 20
history_path = /var/cache/caaas/app_history history_path = /var/cache/caaas/app_history
cleanup_thread_interval = 5 cleanup_thread_interval = 5
base_flask_url = http://bigfoot-m2.eurecom.fr base_flask_url = http://bigfoot-m2.eurecom.fr
cleanup_notebooks_older_than = 24
cleanup_notebooks_warning = 22
[smtp] [smtp]
user = user@gmail.com user = user@gmail.com
......
from caaas.sql import CAaaState
from caaas.cluster_description import SparkClusterDescription
from caaas.spark_app_execution import application_submitted, setup_volume, AppHistory
from flask import Flask from flask import Flask
app = Flask(__name__) app = Flask(__name__)
from caaas.swarm_manager import SwarmManager
sm = SwarmManager()
sm.connect()
import caaas.web import caaas.web
import caaas.api import caaas.api
from caaas.cleanup_thread import start_cleanup_thread
...@@ -2,7 +2,10 @@ from flask import jsonify, request, send_file ...@@ -2,7 +2,10 @@ from flask import jsonify, request, send_file
import time import time
from zipfile import is_zipfile from zipfile import is_zipfile
from caaas import app, sm, CAaaState, application_submitted, setup_volume, AppHistory from caaas import app
from caaas.sql import CAaaState
from caaas.spark_app_execution import application_submitted, setup_volume, AppHistory
from caaas.swarm_manager import sm
STATS_CACHING_EXPIRATION = 1 # seconds STATS_CACHING_EXPIRATION = 1 # seconds
......
from datetime import datetime, timedelta
import threading import threading
import time import time
from traceback import print_exc
import smtplib import smtplib
from email.mime.text import MIMEText from email.mime.text import MIMEText
from jinja2 import Template from jinja2 import Template
from caaas import CAaaState, sm from caaas.config_parser import config
from utils import config from caaas.proxy_manager import get_notebook_address
from caaas.sql import CAaaState
from caaas.swarm_manager import sm
EMAIL_TEMPLATE = """Application {{ name }} has finished executing after {{ runtime }}. APP_FINISH_EMAIL_TEMPLATE = """Application {{ name }} has finished executing after {{ runtime }}.
At this URL you can download the execution logs: {{ log_url }} At this URL you can download the execution logs: {{ log_url }}
""" """
NOTEBOOK_KILLED_EMAIL_TEMPLATE = """Your Spark notebook has not been used in the past {{ max_age }} hours and has been terminated."""
NOTEBOOK_WARNING_EMAIL_TEMPLATE = """Your Spark notebook has not been used in the past {{ wrn_age }} hours
and will be terminated unless you access it in the next {{ grace_time }} hours.
Notebook URL: {{ nb_url }}
"""
def do_duration(seconds): def do_duration(seconds):
m, s = divmod(seconds, 60) m, s = divmod(seconds, 60)
...@@ -46,36 +59,72 @@ def start_cleanup_thread(): ...@@ -46,36 +59,72 @@ def start_cleanup_thread():
def _loop(): def _loop():
while True: while True:
clean_completed_apps() # noinspection PyBroadException
time.sleep(config.get_cleanup_interval()) try:
clean_completed_apps()
check_notebooks()
time.sleep(int(config.cleanup_thread_interval))
except:
print_exc()
def check_notebooks():
state = CAaaState()
td = timedelta(hours=int(config.cleanup_notebooks_older_than))
old_age = datetime.now() - td
nb_too_old = state.get_old_spark_notebooks(old_age)
for cluster_id in nb_too_old:
user_id = state.get_cluster(cluster_id)["user_id"]
sm.terminate_cluster(cluster_id)
subject = "[CAaas] Notebook terminated"
user_email = state.get_user_email(user_id)
template_vars = {'max_age': config.cleanup_notebooks_older_than}
send_email(user_email, subject, NOTEBOOK_KILLED_EMAIL_TEMPLATE, template_vars)
td = timedelta(hours=int(config.cleanup_notebooks_warning))
wrn_age = datetime.now() - td
nb_wrn = state.get_old_spark_notebooks(wrn_age)
for cluster_id in nb_wrn:
user_id = state.get_cluster(cluster_id)["user_id"]
subject = "[CAaas] Notebook termination advance warning"
user_email = state.get_user_email(user_id)
template_vars = {
'grace_time': int(config.cleanup_notebooks_older_than) - int(config.cleanup_notebooks_warning),
'wrn_age': config.cleanup_notebooks_warning,
'nb_url': get_notebook_address(cluster_id)
}
send_email(user_email, subject, NOTEBOOK_WARNING_EMAIL_TEMPLATE, template_vars)
def app_cleanup(app_id, cluster_id): def app_cleanup(app_id, cluster_id):
sm.save_logs(app_id, cluster_id) sm.save_logs(app_id, cluster_id)
send_email(app_id)
sm.terminate_cluster(cluster_id)
def send_email(app_id):
state = CAaaState() state = CAaaState()
app = state.get_application(app_id) app = state.get_application(app_id)
username = state.get_user_email(app["user_id"]) username = state.get_user_email(app["user_id"])
jinja_template = Template(EMAIL_TEMPLATE) template_vars = {
body = jinja_template.render({
'cmdline': app["cmd"], 'cmdline': app["cmd"],
'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()), 'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()),
'name': app["execution_name"], 'name': app["execution_name"],
'log_url': config.get_flask_server_url() + '/api/' + username + "/history/" + str(app_id) + "/logs" 'log_url': config.flask_base_url + "/api/{}/history/{}/logs".format(username, app_id)
}) }
subject = '[CAaaS] Spark execution {} finished'.format(app["execution_name"])
send_email(username, subject, APP_FINISH_EMAIL_TEMPLATE, template_vars)
sm.terminate_cluster(cluster_id)
def send_email(address, subject, template, template_vars):
jinja_template = Template(template)
body = jinja_template.render(template_vars)
msg = MIMEText(body) msg = MIMEText(body)
msg['Subject'] = '[CAaaS] Spark execution {} finished'.format(app["execution_name"]) msg['Subject'] = subject
msg['From'] = 'noreply@bigfoot.eurecom.fr' msg['From'] = 'noreply@bigfoot.eurecom.fr'
msg['To'] = state.get_user_email(app["user_id"]) msg['To'] = address
mail_server = config.get_smtp_info() s = smtplib.SMTP(config.smtp_server)
s = smtplib.SMTP(mail_server["server"])
s.ehlo() s.ehlo()
s.starttls() s.starttls()
s.login(mail_server["user"], mail_server["pass"]) s.login(config.smtp_user, config.smtp_pass)
s.send_message(msg) s.send_message(msg)
s.quit() s.quit()
......
from configparser import ConfigParser
import os
MAIN_PATH = os.path.split(os.path.abspath(os.path.join(__file__, "..")))[0]
class CAaasConfig:
def __init__(self, conf_file):
parser = ConfigParser()
found = parser.read(conf_file)
if not found:
raise ValueError('Configuration file not found')
self.__dict__.update(parser.items('caaas'))
config = CAaasConfig(os.path.join(MAIN_PATH, 'caaas.ini'))
from caaas import CAaaState from caaas.sql import CAaaState
from utils.config import get_proxy_base from caaas.config_parser import config
def _generate_proxied_url(proxy_id): def _generate_proxied_url(proxy_id):
return get_proxy_base() + "/" + proxy_id return config.proxy_base_url + "/" + proxy_id
def get_container_addresses(container_id): def get_container_addresses(container_id):
......
...@@ -2,14 +2,14 @@ from zipfile import ZipFile ...@@ -2,14 +2,14 @@ from zipfile import ZipFile
import os import os
import shutil import shutil
from caaas import CAaaState from caaas.sql import CAaaState
from utils import config from caaas.config_parser import config
class AppHistory: class AppHistory:
def __init__(self, user_id): def __init__(self, user_id):
self.base_path = config.get_app_history_path() self.base_path = config.history_storage_path
self.per_user_max_count = config.get_app_history_count() self.per_user_max_count = int(config.history_per_user_count)
self.user_id = str(user_id) self.user_id = str(user_id)
def _app_path(self, app_id): def _app_path(self, app_id):
...@@ -60,7 +60,7 @@ def application_submitted(user_id, execution_name, spark_options, commandline, f ...@@ -60,7 +60,7 @@ def application_submitted(user_id, execution_name, spark_options, commandline, f
def setup_volume(user_id, app_id, app_pkg): def setup_volume(user_id, app_id, app_pkg):
app_pkg = ZipFile(app_pkg) app_pkg = ZipFile(app_pkg)
exec_path = config.volume_path() exec_path = config.docker_volume_path
exec_path = os.path.join(exec_path, str(user_id), str(app_id)) exec_path = os.path.join(exec_path, str(user_id), str(app_id))
os.makedirs(exec_path) os.makedirs(exec_path)
app_pkg.extractall(exec_path) app_pkg.extractall(exec_path)
......
...@@ -2,7 +2,7 @@ import mysql.connector ...@@ -2,7 +2,7 @@ import mysql.connector
import mysql.connector.cursor import mysql.connector.cursor
import mysql.connector.errors import mysql.connector.errors
from utils.config import get_database_config from caaas.config_parser import config
class CAaaState: class CAaaState:
...@@ -12,7 +12,13 @@ class CAaaState: ...@@ -12,7 +12,13 @@ class CAaaState:
def _reconnect(self): def _reconnect(self):
if self.cnx is not None: if self.cnx is not None:
self.cnx.disconnect() self.cnx.disconnect()
db_config = get_database_config() db_config = {
'user': config.db_user,
'password': config.db_pass,
'host': config.db_server,
'database': config.db_db,
'buffered': True
}
self.cnx = mysql.connector.connect(**db_config) self.cnx = mysql.connector.connect(**db_config)
def _get_cursor(self, dictionary=False) -> mysql.connector.cursor.MySQLCursor: def _get_cursor(self, dictionary=False) -> mysql.connector.cursor.MySQLCursor:
...@@ -163,8 +169,8 @@ class CAaaState: ...@@ -163,8 +169,8 @@ class CAaaState:
def set_master_address(self, cluster_id, address): def set_master_address(self, cluster_id, address):
cursor = self._get_cursor() cursor = self._get_cursor()
q = "UPDATE clusters SET master_address=%s WHERE clusters.id=%s" q = "UPDATE clusters SET master_address=%s WHERE id=%s"
print(address, cluster_id) cursor.execute(q, (address, cluster_id))
self._close_cursor(cursor) self._close_cursor(cursor)
cursor.close() cursor.close()
...@@ -203,15 +209,10 @@ class CAaaState: ...@@ -203,15 +209,10 @@ class CAaaState:
def get_cluster(self, cluster_id): def get_cluster(self, cluster_id):
cursor = self._get_cursor(dictionary=True) cursor = self._get_cursor(dictionary=True)
q = "SELECT user_id, master_address, name, time_created FROM clusters WHERE id=%s" q = "SELECT * FROM clusters WHERE id=%s"
cursor.execute(q, (cluster_id,)) cursor.execute(q, (cluster_id,))
row = cursor.fetchone() row = cursor.fetchone()
res = { res = dict(row)
"user_id": row["user_id"],
"master_address": row["master_address"],
"name": row["name"],
"time_created": row["time_created"]
}
self._close_cursor(cursor) self._close_cursor(cursor)
return res return res
...@@ -271,24 +272,19 @@ class CAaaState: ...@@ -271,24 +272,19 @@ class CAaaState:
return res return res
def get_proxies(self, cluster_id=None, container_id=None): def get_proxies(self, cluster_id=None, container_id=None):
cursor = self._get_cursor() cursor = self._get_cursor(dictionary=True)
if cluster_id is None and container_id is None: if cluster_id is None and container_id is None:
q = "SELECT proxy_id, internal_url, service_name, container_id FROM proxy" q = "SELECT * FROM proxy"
cursor.execute(q) cursor.execute(q)
elif container_id is not None: elif container_id is not None:
q = "SELECT proxy_id, internal_url, service_name, container_id FROM proxy WHERE container_id=%s" q = "SELECT * FROM proxy WHERE container_id=%s"
cursor.execute(q, (container_id,)) cursor.execute(q, (container_id,))
else: else:
q = "SELECT proxy_id, internal_url, service_name, container_id FROM proxy WHERE cluster_id=%s" q = "SELECT * FROM proxy WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,)) cursor.execute(q, (cluster_id,))
proxy_list = [] proxy_list = []
for proxy_id, url, service_name, container_id in cursor: for row in cursor:
proxy_list.append({ proxy_list.append(dict(row))
'proxy_id': proxy_id,
'internal_url': url,
'service_name': service_name,
'container_id': container_id
})
self._close_cursor(cursor) self._close_cursor(cursor)
return proxy_list return proxy_list
...@@ -385,3 +381,17 @@ class CAaaState: ...@@ -385,3 +381,17 @@ class CAaaState:
res = None res = None
self._close_cursor(cursor) self._close_cursor(cursor)
return res return res
def update_proxy_access(self, proxy_id, access_ts):
cursor = self._get_cursor()
q = "UPDATE proxy SET last_access=%s WHERE proxy_id=%s"
cursor.execute(q, (access_ts, proxy_id))
self._close_cursor(cursor)
def get_old_spark_notebooks(self, older_than):
cursor = self._get_cursor()
q = "SELECT cluster_id FROM proxy WHERE service_name='notebook' AND last_access < %s"
cursor.execute(q, (older_than,))
ret = [row[0] for row in cursor]
self._close_cursor(cursor)
return ret
import time
import os
from docker import Client from docker import Client
from docker import errors as docker_errors from docker import errors as docker_errors
from docker.utils import create_host_config from docker.utils import create_host_config
import time
import os
from caaas import CAaaState, SparkClusterDescription, AppHistory from caaas.cluster_description import SparkClusterDescription
from caaas.config_parser import config
from caaas.proxy_manager import get_notebook_address from caaas.proxy_manager import get_notebook_address
from utils import config, get_uuid from caaas.spark_app_execution import AppHistory
from caaas.sql import CAaaState
from caaas.utils import get_uuid
REGISTRY = "10.0.0.2:5000" REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1" MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
...@@ -66,7 +70,7 @@ class SwarmManager: ...@@ -66,7 +70,7 @@ class SwarmManager:
self.last_update_timestamp = 0 self.last_update_timestamp = 0
def connect(self): def connect(self):
manager = config.get_swarm_manager_address() manager = config.docker_swarm_manager
self.cli = Client(base_url=manager) self.cli = Client(base_url=manager)
def update_status(self): def update_status(self):
...@@ -157,7 +161,7 @@ class SwarmManager: ...@@ -157,7 +161,7 @@ class SwarmManager:
def _spawn_spark_submit(self, user_id, cluster_id, app_id, cluster_descr, master_info): def _spawn_spark_submit(self, user_id, cluster_id, app_id, cluster_descr, master_info):
state = CAaaState() state = CAaaState()
app = state.get_application(app_id) app = state.get_application(app_id)
app["path"] = os.path.join(config.volume_path(), str(user_id), str(app_id)) app["path"] = os.path.join(config.docker_volume_path, str(user_id), str(app_id))
options = ContainerOptions() options = ContainerOptions()
options.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"]) options.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"])
options.add_env_variable("SPARK_EXECUTOR_RAM", cluster_descr.executor_ram_size) options.add_env_variable("SPARK_EXECUTOR_RAM", cluster_descr.executor_ram_size)
...@@ -233,3 +237,7 @@ class SwarmManager: ...@@ -233,3 +237,7 @@ class SwarmManager:
def swarm_status(self): def swarm_status(self):
return self.cli.info() return self.cli.info()
sm = SwarmManager()
sm.connect()
...@@ -5,4 +5,7 @@ ...@@ -5,4 +5,7 @@
<p>You notebook is available at: <a href="{{ notebook_address }}">{{ notebook_address }}</a></p> <p>You notebook is available at: <a href="{{ notebook_address }}">{{ notebook_address }}</a></p>
<p>Please note that notebooks are automatically terminated {{ max_age }} hours after the last time they are accessed.
A warning email will be sent {{ wrn_time }} hours before.</p>
{% endblock %} {% endblock %}
\ No newline at end of file
...@@ -2,13 +2,14 @@ ...@@ -2,13 +2,14 @@
{% block title %}Submit Spark application{% endblock %} {% block title %}Submit Spark application{% endblock %}
{% block content %} {% block content %}
<h2>Submit Spark application</h2> <h2>Submit Spark application</h2>
<p>Use the form below to submit a Spark application.</p> <p>Use the form below to submit a Spark application.<br>
The application should be packaged in a zip archive containing all the files needed by the application.</p>
<form enctype="multipart/form-data" id="app_upload"> <form enctype="multipart/form-data" id="app_upload">
<label for="exec_name">Execution name:</label> <label for="exec_name">Execution name:</label>
<input type="text" autofocus autocomplete="on" required pattern="[a-z0-9_\-]+" name="exec_name" id="exec_name"><br/> <input type="text" autofocus autocomplete="on" required pattern="[a-z0-9_\-]+" name="exec_name" id="exec_name" placeholder="myapp-run-25"><br/>
<label for="cmd_line">Command line:</label> <label for="cmd_line">Command line:</label>
<input type="text" autocomplete="on" required name="cmd_line" id="cmd_line" size="150"><br/> <input type="text" autocomplete="on" required name="cmd_line" id="cmd_line" size="150" placeholder="wordcount.py hdfs://192.168.45.157/datasets/gutenberg_big_2x.txt hdfs://192.168.45.157/tmp/cntwdc1"><br/>
<label for="spark_options">Spark options:</label> <label for="spark_options">Spark options:</label>
<input type="text" autocomplete="on" name="spark_options" id="spark_options" size="100"><br/> <input type="text" autocomplete="on" name="spark_options" id="spark_options" size="100"><br/>
......
...@@ -2,4 +2,4 @@ from uuid import uuid1 as uuid ...@@ -2,4 +2,4 @@ from uuid import uuid1 as uuid
def get_uuid(): def get_uuid():
return str(uuid()) return str(uuid())
\ No newline at end of file
from flask import render_template from flask import render_template
from caaas import app, sm, CAaaState from caaas import app
from caaas.config_parser import config
from caaas.proxy_manager import get_container_addresses from caaas.proxy_manager import get_container_addresses
from caaas.sql import CAaaState
from caaas.swarm_manager import sm
@app.route("/web/") @app.route("/web/")
...@@ -46,7 +49,9 @@ def web_notebook(username): ...@@ -46,7 +49,9 @@ def web_notebook(username):
user_id = state.get_user_id(username) user_id = state.get_user_id(username)
template_vars = { template_vars = {
"user": username, "user": username,
"notebook_address": sm.get_notebook(user_id) "notebook_address": sm.get_notebook(user_id),
"max_age": config.cleanup_notebooks_older_than,
"wrn_time": int(config.cleanup_notebooks_older_than) - int(config.cleanup_notebooks_warning)
} }
return render_template('notebook.html', **template_vars) return render_template('notebook.html', **template_vars)
......
#!/bin/sh
SPARK_VER=1.4.1
for d in master worker shell submit notebook; do
docker -H 10.0.0.2:2380 pull 10.0.0.2:5000/venza/spark-$d:$SPARK_VER
done
from caaas import app, start_cleanup_thread from caaas import app
from caaas.cleanup_thread import start_cleanup_thread
def main(): def main():
......
from configparser import ConfigParser
import os
MAIN_PATH = os.path.split(os.path.abspath(os.path.join(__file__, "..")))[0]