Commit c177a3fc authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Submit Spark applications

parent 9cfc35dc
......@@ -2,7 +2,11 @@
<project version="4">
<component name="ProjectCodeStyleSettingsManager">
<option name="PER_PROJECT_SETTINGS">
<value />
<value>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
</XML>
</value>
</option>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default (1)" />
</component>
......
......@@ -7,6 +7,18 @@
</database-info>
<case-sensitivity plain-identifiers="exact" quoted-identifiers="exact"/>
<schema name="" catalog="caaas"/>
<table name="applications" schema="" catalog="caaas" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="execution_name" sqlType="VARCHAR" precision="512" scale="0" nullable="false" jdbcType="12"/>
<column name="cmd" sqlType="VARCHAR" precision="1024" scale="0" nullable="false" jdbcType="12"/>
<column name="spark_options" sqlType="VARCHAR" precision="1024" scale="0" nullable="false" jdbcType="12"/>
<column name="user_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/>
<column name="time_started" sqlType="TIMESTAMP" precision="19" scale="0" nullable="true" jdbcType="93"/>
<column name="time_finished" sqlType="TIMESTAMP" precision="19" scale="0" nullable="true" jdbcType="93"/>
<column name="cluster_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="status" sqlType="VARCHAR" precision="16" scale="0" nullable="false" jdbcType="12" def="J3NldHVwJw=="/>
<primary-key name="PRIMARY" columns="id"/>
</table>
<table name="clusters" schema="" catalog="caaas" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="user_id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4"/>
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" hash="3069628127">
<component name="DataSourceManagerImpl" format="xml" hash="730622038">
<data-source source="LOCAL" name="MySQL - @m1" uuid="a32fd6de-3ffa-40c0-9ec8-8953a89c53e0">
<driver-ref>mysql</driver-ref>
<synchronize>true</synchronize>
......
This diff is collapsed.
......@@ -75,14 +75,17 @@ def commit_and_reload(generated_file):
system("sudo service apache2 reload")
def update_proxy():
entries = get_proxy_entries()
output = generate_file(entries)
if check_difference(output):
commit_and_reload(output)
if __name__ == "__main__":
print("CAaaS Apache proxy synchronizer starting")
while True:
print("Looping...")
entries = get_proxy_entries()
print(entries)
output = generate_file(entries)
if check_difference(output):
commit_and_reload(output)
# print("Checking proxy entries...")
update_proxy()
# print("Checking for completed applications to clean up")
sleep(1)
[docker]
swarm-manager = tcp://127.0.0.1:2380
volume-path = /mnt/cephfs/caaas
[db]
user = caaas
......@@ -9,3 +10,14 @@ db = caaas
[proxy]
base_url = http://some-host/some-path
[caaas]
history_per_user_count = 20
history_path = /var/cache/caaas/app_history
cleanup_thread_interval = 5
base_flask_url = http://bigfoot-m2.eurecom.fr
[smtp]
user = user@gmail.com
pass = pass
server = smtp.gmail.com:25
from caaas.sql import CAaaState
from caaas.cluster_description import SparkClusterDescription
from caaas.spark_app_execution import application_submitted, setup_volume, AppHistory
from flask import Flask
app = Flask(__name__)
......@@ -11,3 +12,5 @@ sm.connect()
import caaas.web
import caaas.api
from caaas.cleanup_thread import start_cleanup_thread
from flask import jsonify
from flask import jsonify, request, send_file
import time
from zipfile import is_zipfile
from caaas import app, sm, CAaaState
from caaas import app, sm, CAaaState, application_submitted, setup_volume, AppHistory
STATS_CACHING_EXPIRATION = 1 # seconds
......@@ -55,3 +56,54 @@ def api_terminate_cluster(username, cluster_id):
else:
ret["status"] = "error"
return jsonify(**ret)
@app.route("/api/<username>/container/<container_id>/logs")
def api_container_logs(username, container_id):
db = CAaaState()
user_id = db.get_user_id(username)
# FIXME: check user_id
logs = sm.get_log(container_id)
if logs is None:
ret = {
"status": "no such container",
"logs": ""
}
else:
logs = logs.decode("ascii").split("\n")
ret = {
"status": "ok",
"logs": logs
}
return jsonify(**ret)
@app.route("/api/<username>/spark-submit", methods=['POST'])
def api_spark_submit(username):
file_data = request.files['file']
form_data = request.form
state = CAaaState()
user_id = state.get_user_id(username)
# FIXME: check user_id
if not is_zipfile(file_data.stream):
ret = {
"status": "not a zip file"
}
return jsonify(**ret)
app_id = application_submitted(user_id, form_data["exec_name"], form_data["spark_options"], form_data["cmd_line"], file_data)
setup_volume(user_id, app_id, file_data.stream)
sm.spark_submit(user_id, app_id)
ret = {
"status": "ok"
}
return jsonify(**ret)
@app.route("/api/<username>/history/<app_id>/logs")
def api_history_log_archive(username, app_id):
state = CAaaState()
user_id = state.get_user_id(username)
# FIXME: check user_id
ah = AppHistory(user_id)
path = ah.get_log_archive_path(app_id)
return send_file(path, mimetype="application/zip")
import threading
import time
import smtplib
from email.mime.text import MIMEText
from jinja2 import Template
from caaas import CAaaState, sm
from utils import config
EMAIL_TEMPLATE = """Application {{ name }} has finished executing after {{ runtime }}.
At this URL you can download the execution logs: {{ log_url }}
"""
def do_duration(seconds):
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
tokens = []
if d > 1:
tokens.append('{d:.0f} days')
elif d:
tokens.append('{d:.0f} day')
if h > 1:
tokens.append('{h:.0f} hours')
elif h:
tokens.append('{h:.0f} hour')
if m > 1:
tokens.append('{m:.0f} minutes')
elif m:
tokens.append('{m:.0f} minute')
if s > 1:
tokens.append('{s:.0f} seconds')
elif s:
tokens.append('{s:.0f} second')
template = ', '.join(tokens)
return template.format(d=d, h=h, m=m, s=s)
def start_cleanup_thread():
th = threading.Thread(target=_loop)
th.daemon = True
th.start()
def _loop():
while True:
clean_completed_apps()
time.sleep(config.get_cleanup_interval())
def app_cleanup(app_id, cluster_id):
sm.save_logs(app_id, cluster_id)
send_email(app_id)
sm.terminate_cluster(cluster_id)
def send_email(app_id):
state = CAaaState()
app = state.get_application(app_id)
username = state.get_user_email(app["user_id"])
jinja_template = Template(EMAIL_TEMPLATE)
body = jinja_template.render({
'cmdline': app["cmd"],
'runtime': do_duration((app["time_finished"] - app["time_started"]).total_seconds()),
'name': app["execution_name"],
'log_url': config.get_flask_server_url() + '/api/' + username + "/history/" + str(app_id) + "/logs"
})
msg = MIMEText(body)
msg['Subject'] = '[CAaaS] Spark execution {} finished'.format(app["execution_name"])
msg['From'] = 'noreply@bigfoot.eurecom.fr'
msg['To'] = state.get_user_email(app["user_id"])
mail_server = config.get_smtp_info()
s = smtplib.SMTP(mail_server["server"])
s.ehlo()
s.starttls()
s.login(mail_server["user"], mail_server["pass"])
s.send_message(msg)
s.quit()
def clean_completed_apps():
state = CAaaState()
cont_ids = state.get_submit_containers()
for cont_id, cluster_id in cont_ids:
if not sm.check_container_alive(cont_id):
print("Found an app to cleanup")
app_id = state.find_app_for_cluster(cluster_id)
state.application_finished(app_id)
app_cleanup(app_id, cluster_id)
......@@ -8,3 +8,8 @@ class SparkClusterDescription:
self.num_workers = 2
self.worker_cores = "2"
self.executor_ram_size = "4g"
def for_spark_app(self, app_id):
self.num_workers = 4
self.worker_cores = "5"
self.executor_ram_size = "8g"
from zipfile import ZipFile
import os
import shutil
from caaas import CAaaState
from utils import config
class AppHistory:
def __init__(self, user_id):
self.base_path = config.get_app_history_path()
self.per_user_max_count = config.get_app_history_count()
self.user_id = str(user_id)
def _app_path(self, app_id):
return os.path.join(self.base_path, self.user_id, str(app_id))
def _delete_app_history(self, app_id):
app_path = self._app_path(app_id)
shutil.rmtree(app_path)
def cleanup(self):
state = CAaaState()
num_apps = state.count_apps_finished(self.user_id)
if num_apps > self.per_user_max_count:
app_id = state.remove_oldest_application(self.user_id)
self._delete_app_history(app_id)
def add_application_zip(self, app_id, file_data):
app_path = self._app_path(app_id)
if not os.path.exists(app_path):
os.makedirs(app_path)
file_data.save(os.path.join(app_path, "app.zip"))
def save_log(self, app_id, logname, log):
app_path = self._app_path(app_id)
assert os.path.exists(app_path)
zip_path = os.path.join(app_path, "logs.zip")
z = ZipFile(zip_path, mode="a")
z.writestr(logname + ".txt", log)
z.close()
def get_log_archive_path(self, app_id):
app_path = self._app_path(app_id)
zip_path = os.path.join(app_path, "logs.zip")
if not os.path.exists(app_path):
return None
else:
return zip_path
def application_submitted(user_id, execution_name, spark_options, commandline, file_data) -> int:
ah = AppHistory(user_id)
ah.cleanup()
state = CAaaState()
app_id = state.new_application(user_id, execution_name, spark_options, commandline)
ah.add_application_zip(app_id, file_data)
return app_id
def setup_volume(user_id, app_id, app_pkg):
app_pkg = ZipFile(app_pkg)
exec_path = config.volume_path()
exec_path = os.path.join(exec_path, str(user_id), str(app_id))
os.makedirs(exec_path)
app_pkg.extractall(exec_path)
state = CAaaState()
state.application_ready(app_id)
import mysql.connector
import mysql.connector.cursor
import mysql.connector.errors
from utils.config import get_database_config
......@@ -14,7 +15,7 @@ class CAaaState:
db_config = get_database_config()
self.cnx = mysql.connector.connect(**db_config)
def _get_cursor(self, dictionary=False):
def _get_cursor(self, dictionary=False) -> mysql.connector.cursor.MySQLCursor:
try:
cursor = self.cnx.cursor(dictionary=dictionary)
except (mysql.connector.errors.OperationalError, AttributeError):
......@@ -50,6 +51,14 @@ class CAaaState:
def get_user_id(self, username):
return self._check_user(username)
def get_user_email(self, user_id):
cursor = self._get_cursor()
q = "SELECT username FROM users WHERE id=%s"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def get_all_users(self):
cursor = self._get_cursor()
q = "SELECT id, username FROM users"
......@@ -61,6 +70,18 @@ class CAaaState:
self._close_cursor(cursor)
return user_list
def count_apps_finished(self, user_id=None):
cursor = self._get_cursor()
if user_id is None:
q = "SELECT COUNT(*) FROM applications WHERE time_finished IS NOT NULL"
cursor.execute(q)
else:
q = "SELECT COUNT(*) FROM applications WHERE user_id=%s AND time_finished IS NOT NULL"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def count_clusters(self, user_id=None):
cursor = self._get_cursor()
if user_id is None:
......@@ -221,6 +242,34 @@ class CAaaState:
self._close_cursor(cursor)
return res
def get_container(self, container_id):
cursor = self._get_cursor(dictionary=True)
res = {}
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers WHERE id=%s"
cursor.execute(q, (container_id,))
for row in cursor:
res = {
"id": row["id"],
"docker_id": row["docker_id"],
"cluster_id": row["cluster_id"],
"user_id": row["user_id"],
"ip_address": row["ip_address"],
"contents": row["contents"],
}
self._close_cursor(cursor)
return res
def get_submit_containers(self) -> (int, int):
cursor = self._get_cursor(dictionary=True)
res = []
q = "SELECT id, cluster_id FROM containers WHERE contents='spark-submit'"
cursor.execute(q)
for row in cursor:
res.append((row["id"], row["cluster_id"]))
self._close_cursor(cursor)
return res
def get_proxies(self, cluster_id=None, container_id=None):
cursor = self._get_cursor()
if cluster_id is None and container_id is None:
......@@ -260,3 +309,65 @@ class CAaaState:
q = "DELETE FROM clusters WHERE id=%s"
cursor.execute(q, (cluster_id,))
self._close_cursor(cursor)
def new_application(self, user_id: int, execution_name: str, spark_options: str, commandline: str) -> int:
cursor = self._get_cursor()
q = "INSERT INTO applications (execution_name, cmd, spark_options, user_id) VALUES (%s, %s, %s, %s)"
cursor.execute(q, (execution_name, commandline, spark_options, user_id))
app_id = cursor.lastrowid
self._close_cursor(cursor)
return app_id
def remove_oldest_application(self, user_id) -> str:
cursor = self._get_cursor()
q = "SELECT id FROM applications WHERE user_id=%s AND time_finished IS NOT NULL ORDER BY time_finished ASC LIMIT 1"
cursor.execute(q, (user_id,))
app_id = cursor.fetchone()[0]
q = "DELETE FROM applications WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
return app_id
def application_ready(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET status='ready' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def application_started(self, app_id, cluster_id):
cursor = self._get_cursor()
q = "UPDATE applications SET cluster_id=%s, time_started=CURRENT_TIMESTAMP, status='running' WHERE id=%s"
cursor.execute(q, (cluster_id, app_id))
self._close_cursor(cursor)
def application_killed(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET time_finished=CURRENT_TIMESTAMP, status='killed' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def application_finished(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET time_finished=CURRENT_TIMESTAMP, status='finished' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def get_application(self, app_id: int) -> dict:
cursor = self._get_cursor(dictionary=True)
q = "SELECT * FROM applications WHERE id=%s"
cursor.execute(q, (app_id,))
res = dict(cursor.fetchone())
self._close_cursor(cursor)
return res
def find_app_for_cluster(self, cluster_id: int):
cursor = self._get_cursor()
q = "SELECT id FROM applications WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,))
if cursor.rowcount > 0:
res = cursor.fetchone()[0]
else:
res = None
self._close_cursor(cursor)
return res
......@@ -2,23 +2,18 @@ from docker import Client
from docker import errors as docker_errors
from docker.utils import create_host_config
import time
from uuid import uuid4 as uuid
import os
from caaas import CAaaState
from caaas import SparkClusterDescription
from caaas import CAaaState, SparkClusterDescription, AppHistory
from caaas.proxy_manager import get_notebook_address
from utils import config
from utils import config, get_uuid
REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
WORKER_IMAGE = REGISTRY + "/venza/spark-worker:1.4.1"
SHELL_IMAGE = REGISTRY + "/venza/spark-shell:1.4.1"
SUBMIT_IMAGE = REGISTRY + "/venza/spark-submit:1.4.1"
CONTAINER_IMAGE = REGISTRY + "/venza/spark-notebook:1.4.1"
def get_uuid():
return str(uuid())
NOTEBOOK_IMAGE = REGISTRY + "/venza/spark-notebook:1.4.1"
class SwarmStatus:
......@@ -27,6 +22,36 @@ class SwarmStatus:
self.num_containers = 0
class ContainerOptions:
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
def add_env_variable(self, name, value):
self.env[name] = value
def get_environment(self):
return self.env
def add_volume_bind(self, path, mountpoint, readonly=False):
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + "ro" if readonly else "rw")
def get_volumes(self):
return self.volumes
def get_volume_binds(self):
return self.volume_binds
def set_command(self, cmd):
self.command = cmd
def get_command(self):
return self.command
class SwarmManager:
def __init__(self):
self.status = SwarmStatus()
......@@ -48,77 +73,102 @@ class SwarmManager:
db = CAaaState()
nb = db.get_notebook(user_id)
if nb is None:
self.start_cluster_with_notebook(user_id)
self._start_cluster_with_notebook(user_id)
nb = db.get_notebook(user_id)
return get_notebook_address(nb)
def start_cluster_with_notebook(self, user_id):
def spark_submit(self, user_id, app_id):
cluster_id = self._start_cluster_for_app(user_id, app_id)
return cluster_id
def _start_cluster_with_notebook(self, user_id):
cluster_descr = SparkClusterDescription()
cluster_descr.for_spark_notebook()
self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
return self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
def _start_cluster_for_app(self, user_id: int, app_id: int):
state = CAaaState()
cluster_descr = SparkClusterDescription()
cluster_descr.for_spark_app(app_id)
cluster_id = self._create_new_spark_cluster(user_id, "spark-application", cluster_descr, app_id=app_id)
state.application_started(app_id, cluster_id)
return cluster_id
def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook):
def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook=False, app_id=None):
db = CAaaState()
try:
cluster_id = db.new_cluster(user_id, name)
master_info = self._spawn_spark_master(cluster_id, user_id, cluster_descr)
db.set_master_address(cluster_id, master_info["spark_master_address"])
for i in range(cluster_descr.num_workers):
self._spawn_spark_worker(cluster_id, user_id, cluster_descr, master_info)
self._spawn_spark_worker(cluster_id, user_id, cluster_descr, master_info, i)
if with_notebook:
self._spawn_spark_notebook(cluster_id, user_id, cluster_descr, master_info)
if app_id is not None:
self._spawn_spark_submit(user_id, cluster_id, app_id, cluster_descr, master_info)
return cluster_id
except docker_errors.APIError as e:
print("Error starting container: " + str(e.explanation))
# FIXME: should rollback all changes to DB
return None
def _spawn_spark_master(self, cluster_id, user_id, cluster_descr):
db = CAaaState()
options = {
"environment": {},
}
options = ContainerOptions()
info = self._spawn_container(MASTER_IMAGE, options)
info["spark_master_address"] = "http://" + info["docker_ip"] + ":8080"
cont_id = db.new_container(cluster_id, user_id, info["docker_id"