Commit d63151fd authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Fix line terminations

Implement emails
Other fixes
parent b526067b
......@@ -59,3 +59,4 @@ docs/_build/
# PyBuilder
......@@ -13,12 +13,15 @@ It is composed of three components:
* MySQL to keep all the state
* Docker Swarm
* A Docker registry containing Spark images
* Redis
* Apache to act as a reverse proxy
## Configuration
Zoe configuration is kept, for now, in a Python file: `common/`
The cookie secret key is defined in `zoe_web/`.
### Swarm/Docker
For testing you can use also a single Docker instance, just set its endpoint in the configuration file mentioned above.
......@@ -32,6 +35,11 @@ Use the scripts in the [zoe-docker-images](
and populate a private registry with Spark images. The images are quite standard and can be used also without Zoe, for examples
on how to do that, see the `scripts/` script.
### Redis
Redis is used for storing Spark applications and logs, in zip archives. It is not the best use of redis, but it provides a
very simple to use interface. We are looking for a different solution and this requirement will likely disappear soon.
### Apache configuration
Zoe generates dynamically proxy entries to let users access to the various web interfaces contained in the Spark containers.
......@@ -16,5 +16,9 @@ conf = {
'client_rpyc_autodiscovery': True,
'client_rpyc_server': None,
'client_rpyc_port': None,
'proxy_path_prefix': '/proxy'
'proxy_path_prefix': '/proxy',
'smtp_server': '',
'smtp_user': '',
'smtp_pass': open('smtp_pass.txt', 'r').read().strip(),
'web_server_name': ''
from common.state.execution import Execution
from common.state import Proxy, AlchemySession
from common.configuration import conf
def generate_log_history_url(execution: Execution) -> str:
zoe_web_log_history_path = '/api/history/logs/'
return 'http://' + conf['web_server_name'] + zoe_web_log_history_path + str(
def generate_notebook_url(execution: Execution) -> str:
state = AlchemySession()
c = execution.find_container("spark-notebook")
pr = state.query(Proxy).filter_by(, service_name="Spark Notebook interface").one()
return 'http://' + conf['web_server_name'] + conf['proxy_path_prefix'] + '/{}'.format(
......@@ -140,13 +140,20 @@ class ZoeClient:
def application_get(self, application_id) -> PlainApplication:
def application_get(self, application_id: int) -> PlainApplication:
ret = self.state.query(Application).filter_by(id=application_id).one()
return ret.extract()
except NoResultFound:
return None
def application_get_binary(self, application_id: int) -> bytes:
application = self.state.query(Application).filter_by(id=application_id).one()
except NoResultFound:
return None
return storage.application_data_download(application)
def application_remove(self, application_id: int):
application = self.state.query(Application).filter_by(id=application_id).one()
from datetime import datetime, timedelta
import time
from traceback import print_exc
import smtplib
from email.mime.text import MIMEText
import logging
log = logging.getLogger(__name__)
from jinja2 import Template
from zoe_web.proxy_manager import get_notebook_address
from common.status import SparkSubmitExecution, Execution
from common.urls import generate_log_history_url, generate_notebook_url
from common.configuration import conf
log = logging.getLogger(__name__)
APP_FINISH_EMAIL_TEMPLATE = """Application {{ name }} has finished executing after {{ runtime }}.
......@@ -49,61 +50,41 @@ def do_duration(seconds):
return template.format(d=d, h=h, m=m, s=s)
def email_task():
ts = time.time()
# noinspection PyBroadException
log.debug("Cleanup task completed in {:.3}s".format(time.time() - ts))
def check_notebooks():
state = CAaaState()
td = timedelta(hours=int(config.cleanup_notebooks_older_than))
old_age = - td
nb_too_old = state.get_old_spark_notebooks(old_age)
for cluster_id in nb_too_old:
user_id = state.get_cluster(cluster_id)["user_id"]
subject = "[CAaas] Notebook terminated"
user_email = state.get_user_email(user_id)
template_vars = {'max_age': config.cleanup_notebooks_older_than}
send_email(user_email, subject, NOTEBOOK_KILLED_EMAIL_TEMPLATE, template_vars)
td = timedelta(hours=int(config.cleanup_notebooks_warning))
wrn_age = - td
nb_wrn = state.get_old_spark_notebooks(wrn_age)
for cluster_id in nb_wrn:
user_id = state.get_cluster(cluster_id)["user_id"]
subject = "[CAaas] Notebook termination advance warning"
user_email = state.get_user_email(user_id)
template_vars = {
'grace_time': int(config.cleanup_notebooks_older_than) - int(config.cleanup_notebooks_warning),
'wrn_age': config.cleanup_notebooks_warning,
'nb_url': get_notebook_address(cluster_id)
send_email(user_email, subject, NOTEBOOK_WARNING_EMAIL_TEMPLATE, template_vars)
def app_cleanup(app_id, cluster_id):
sm.save_logs(app_id, cluster_id)
state = CAaaState()
app = state.get_application(app_id)
email = state.get_user_email(app["user_id"])
def notify_execution_finished(execution: SparkSubmitExecution):
app = execution.application
email =
template_vars = {
'cmdline': app["cmd"],
'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()),
'name': app["execution_name"],
'log_url': config.flask_base_url + "/api/{}/history/{}/logs".format(app["user_id"], app_id)
'cmdline': execution.commandline,
'runtime': do_duration((execution.time_finished - execution.time_started).total_seconds()),
'log_url': generate_log_history_url(execution)
subject = '[CAaaS] Spark execution {} finished'.format(app["execution_name"])
subject = '[Zoe] Spark execution {} finished'.format(
send_email(email, subject, APP_FINISH_EMAIL_TEMPLATE, template_vars)
def notify_notebook_notice(execution: Execution):
app = execution.application
email =
subject = "[Zoe] Notebook termination warning"
template_vars = {
'grace_time': conf['notebook_max_age_no_activity'] - conf['notebook_warning_age_no_activity'],
'wrn_age': conf['notebook_warning_age_no_activity'],
'nb_url': generate_notebook_url(execution)
send_email(email, subject, NOTEBOOK_WARNING_EMAIL_TEMPLATE, template_vars)
def notify_notebook_termination(execution: Execution):
app = execution.application
email =
subject = "[Zoe] Notebook terminated"
template_vars = {'max_age': conf['notebook_max_age_no_activity']}
send_email(email, subject, NOTEBOOK_KILLED_EMAIL_TEMPLATE, template_vars)
def send_email(address, subject, template, template_vars):
......@@ -113,20 +94,9 @@ def send_email(address, subject, template, template_vars):
msg['Subject'] = subject
msg['From'] = ''
msg['To'] = address
s = smtplib.SMTP(config.smtp_server)
s = smtplib.SMTP(conf['smtp_server'])
s.login(config.smtp_user, config.smtp_pass)
s.login(conf['smtp_user'], conf['smtp_pass'])
def clean_completed_apps():
state = CAaaState()
cont_ids = state.get_submit_containers()
for cont_id, cluster_id in cont_ids:
if not sm.check_container_alive(cont_id):
app_id = state.find_app_for_cluster(cluster_id)"App {} needs to be cleaned up".format(app_id))
app_cleanup(app_id, cluster_id)
......@@ -6,6 +6,7 @@ import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from zoe_scheduler.emails import notify_execution_finished, notify_notebook_notice, notify_notebook_termination
from common.state import AlchemySession, Cluster, Container, SparkApplication, Proxy, Execution, SparkNotebookApplication, SparkSubmitApplication, SparkSubmitExecution
from common.application_resources import ApplicationResources
......@@ -210,9 +211,11 @@ class PlatformManager:
if - pr.last_access > timedelta(hours=conf["notebook_max_age_no_activity"]):"Killing spark notebook {} for inactivity".format(
self.execution_terminate(state, e)
if - pr.last_access > timedelta(hours=conf["notebook_max_age_no_activity"]) - timedelta(hours=conf["notebook_warning_age_no_activity"]):"Spark notebook {} is on notice for inactivity".format(
e.termination_notice = True
......@@ -220,5 +223,6 @@ class PlatformManager:
if container.readable_name == "spark-submit" or container.readable_name == "spark-master":
log.debug("found a dead spark-submit container, cleaning up")
self.execution_terminate(state, container.cluster.execution)
log.warning("Container {} (ID: {}) died unexpectedly")
......@@ -10,24 +10,3 @@ app.register_blueprint(api_bp, url_prefix='/api')
app.secret_key = b"\xc3\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!"
def debug_list_routes():
output = []
for rule in app.url_map.iter_rules():
options = {}
for arg in rule.arguments:
options[arg] = "[{0}]".format(arg)
methods = ','.join(rule.methods)
url = url_for(rule.endpoint, **options)
line = "{:50s} {:20s} {}".format(rule.endpoint, methods, url)
for line in sorted(output):
def page_not_found(_):
......@@ -77,6 +77,18 @@ def application_delete(app_id):
return jsonify(status="ok")
def application_binary_download(app_id: int):
client = get_zoe_client()
data = client.application_get_binary(app_id)
if data is None:
return jsonify(status="error")
return send_file(BytesIO(data), mimetype="application/zip", as_attachment=True, attachment_filename="app-{}.zip".format(app_id))
@api_bp.route('/executions/new', methods=['POST'])
def execution_new():
client = get_zoe_client()
......@@ -128,4 +140,4 @@ def history_logs_get(execution_id: int):
if logs is None:
return abort(404)
return send_file(BytesIO(logs), mimetype="application/zip")
return send_file(BytesIO(logs), mimetype="application/zip", as_attachment=True, attachment_filename="logs-{}.zip".format(execution_id))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment