Commit 8e5ac4d7 authored by Daniele Venzano's avatar Daniele Venzano

Start refactoring web interface for v2

parent 434ad2d2
......@@ -110,4 +110,13 @@ class SparkSubmitApplication(SparkApplication):
class PlainApplication:
pass
id = None
name = None
required_resources = None
user_id = None
executions = None
type = None
master_image = None
worker_image = None
notebook_image = None
submit_image = None
......@@ -84,4 +84,16 @@ class SparkSubmitExecution(Execution):
class PlainExecution:
pass
id = None
name = None
assigned_resources = None
application_id = None
time_started = None
time_scheduled = None
time_finished = None
status = None
termination_notice = None
cluster_id = None
type = None
commandline = None
spark_opts = None
......@@ -24,4 +24,6 @@ class User(Base):
class PlainUser:
pass
id = None
email = None
......@@ -140,7 +140,7 @@ class ZoeClient:
return None
return self.server.application_status(application.id)
def spark_application_list(self, user_id) -> [PlainApplication]:
def application_list(self, user_id) -> [PlainApplication]:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
......
from flask import Flask
from zoe_web.api import api
from zoe_web.web import web
app = Flask(__name__)
app.register_blueprint(web, url_prefix='/web')
app.register_blueprint(api, url_prefix='/web/api')
import zoe_web.web
import zoe_web.api
app.secret_key = b"\xc3\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!"
from flask import Blueprint, abort
api = Blueprint('api', __name__)
@api.route('/status/basic')
def basic_status():
abort(404)
from flask import jsonify, request, send_file, abort
from flask import jsonify, request, send_file, abort, Blueprint
from zipfile import is_zipfile
from zoe_web import app
from zoe_web.sql import CAaaState
from zoe_web.spark_app_execution import application_submitted, setup_volume, AppHistory
from zoe_web.swarm_manager import sm
STATS_CACHING_EXPIRATION = 1 # seconds
zoeweb_api = Blueprint('zoeweb_api', __name__)
@app.route("/api/<int:user_id>/cluster/<int:cluster_id>/terminate")
def api_terminate_cluster(user_id, cluster_id):
......
from zipfile import ZipFile
import os
import shutil
from zoe_web.sql import CAaaState
from zoe_web.config_parser import config
class AppHistory:
def __init__(self, user_id):
self.base_path = config.history_storage_path
self.per_user_max_count = int(config.history_per_user_count)
self.user_id = str(user_id)
def _app_path(self, app_id):
return os.path.join(self.base_path, self.user_id, str(app_id))
def _delete_app_history(self, app_id):
app_path = self._app_path(app_id)
shutil.rmtree(app_path)
def cleanup(self):
state = CAaaState()
num_apps = state.count_apps_finished(self.user_id)
if num_apps > self.per_user_max_count:
app_id = state.remove_oldest_application(self.user_id)
self._delete_app_history(app_id)
def add_application_zip(self, app_id, file_data):
app_path = self._app_path(app_id)
if not os.path.exists(app_path):
os.makedirs(app_path)
file_data.save(os.path.join(app_path, "app.zip"))
def save_log(self, app_id, logname, log):
app_path = self._app_path(app_id)
assert os.path.exists(app_path)
zip_path = os.path.join(app_path, "logs.zip")
z = ZipFile(zip_path, mode="a")
z.writestr(logname + ".txt", log)
z.close()
def get_log_archive_path(self, app_id):
app_path = self._app_path(app_id)
zip_path = os.path.join(app_path, "logs.zip")
if not os.path.exists(app_path):
return None
else:
return zip_path
def application_submitted(user_id, execution_name, spark_options, commandline, file_data) -> int:
ah = AppHistory(user_id)
ah.cleanup()
state = CAaaState()
app_id = state.new_application(user_id, execution_name, spark_options, commandline)
ah.add_application_zip(app_id, file_data)
return app_id
def setup_volume(user_id, app_id, app_pkg):
app_pkg = ZipFile(app_pkg)
exec_path = config.docker_volume_path
exec_path = os.path.join(exec_path, str(user_id), str(app_id))
os.makedirs(exec_path)
app_pkg.extractall(exec_path)
state = CAaaState()
state.application_ready(app_id)
import mysql.connector
import mysql.connector.cursor
import mysql.connector.errors
from zoe_web.config_parser import config
class CAaaState:
def __init__(self):
self.cnx = None
def _reconnect(self):
if self.cnx is not None:
self.cnx.disconnect()
db_config = {
'user': config.db_user,
'password': config.db_pass,
'host': config.db_server,
'database': config.db_db,
'buffered': True
}
self.cnx = mysql.connector.connect(**db_config)
def _get_cursor(self, dictionary=False) -> mysql.connector.cursor.MySQLCursor:
try:
cursor = self.cnx.cursor(dictionary=dictionary)
except (mysql.connector.errors.OperationalError, AttributeError):
self._reconnect()
cursor = self.cnx.cursor(dictionary=dictionary)
return cursor
def _close_cursor(self, cursor):
self.cnx.commit()
cursor.close()
def new_user(self, email):
cursor = self._get_cursor()
q = "INSERT INTO users (email) VALUES (%s)"
cursor.execute(q, (email,))
user_id = cursor.lastrowid
self._close_cursor(cursor)
return user_id
def check_user_id(self, user_id):
cursor = self._get_cursor()
q = "SELECT COUNT(*) FROM users WHERE id=%s"
cursor.execute(q, (user_id,))
count = cursor.fetchone()[0]
self._close_cursor(cursor)
return count == 1
def get_user_id(self, email):
cursor = self._get_cursor()
q = "SELECT id FROM users WHERE email=%s"
cursor.execute(q, (email,))
if cursor.rowcount == 0:
return None
else:
row = cursor.fetchone()[0]
self._close_cursor(cursor)
return row
def get_user_email(self, user_id):
cursor = self._get_cursor()
q = "SELECT email FROM users WHERE id=%s"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def get_all_users(self):
cursor = self._get_cursor()
q = "SELECT id, email FROM users"
user_list = []
cursor.execute(q)
for row in cursor:
user_list.append(row)
self._close_cursor(cursor)
return user_list
def count_apps_finished(self, user_id=None):
cursor = self._get_cursor()
if user_id is None:
q = "SELECT COUNT(*) FROM applications WHERE time_finished IS NOT NULL"
cursor.execute(q)
else:
q = "SELECT COUNT(*) FROM applications WHERE user_id=%s AND time_finished IS NOT NULL"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def count_clusters(self, user_id=None):
cursor = self._get_cursor()
if user_id is None:
q = "SELECT COUNT(*) FROM clusters"
cursor.execute(q)
else:
q = "SELECT COUNT(*) FROM clusters WHERE user_id=%s"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def count_containers(self, user_id=None, cluster_id=None):
cursor = self._get_cursor()
if user_id is None and cluster_id is None:
q = "SELECT COUNT(*) FROM containers"
cursor.execute(q)
elif user_id is not None and cluster_id is None:
q = "SELECT COUNT(*) FROM containers WHERE user_id=%s"
cursor.execute(q, (user_id,))
elif user_id is None and cluster_id is not None:
q = "SELECT COUNT(*) FROM containers WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,))
elif user_id is not None and cluster_id is not None:
q = "SELECT COUNT(*) FROM containers WHERE user_id=%s AND cluster_id=%s"
cursor.execute(q, (user_id, cluster_id))
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def get_notebook(self, user_id):
cursor = self._get_cursor(dictionary=True)
q = "SELECT id FROM clusters WHERE user_id=%s and name='notebook'"
cursor.execute(q, (user_id,))
if cursor.rowcount == 0:
self._close_cursor(cursor)
return None
else:
row = cursor.fetchone()
self._close_cursor(cursor)
return row["id"]
def has_notebook(self, user_id):
ret = self.get_notebook(user_id)
return ret is not None
def get_url_proxy(self, proxy_id):
cursor = self._get_cursor()
q = "SELECT internal_url FROM proxy WHERE id=%s"
cursor.execute(q, (proxy_id,))
if cursor.rowcount == 0:
self._close_cursor(cursor)
return None
else:
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def get_proxy_for_service(self, cluster_id, service_name):
cursor = self._get_cursor()
q = "SELECT id FROM proxy WHERE cluster_id=%s AND service_name=%s"
cursor.execute(q, (cluster_id, service_name))
if cursor.rowcount == 0:
self._close_cursor(cursor)
return None
else:
row = cursor.fetchone()
self._close_cursor(cursor)
return row[0]
def new_cluster(self, user_id, name):
cursor = self._get_cursor()
q = "INSERT INTO clusters (user_id, name) VALUES (%s, %s)"
cursor.execute(q, (user_id, name))
cluster_id = cursor.lastrowid
self._close_cursor(cursor)
return cluster_id
def set_master_address(self, cluster_id, address):
cursor = self._get_cursor()
q = "UPDATE clusters SET master_address=%s WHERE id=%s"
cursor.execute(q, (address, cluster_id))
self._close_cursor(cursor)
cursor.close()
def new_container(self, cluster_id, user_id, docker_id, ip_address, contents):
cursor = self._get_cursor()
q = "INSERT INTO containers (user_id, cluster_id, docker_id, ip_address, contents) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(q, (user_id, cluster_id, docker_id, ip_address, contents))
cont_id = cursor.lastrowid
self._close_cursor(cursor)
return cont_id
def new_proxy_entry(self, proxy_id, cluster_id, address, service_name, container_id):
cursor = self._get_cursor()
q = "INSERT INTO proxy (id, internal_url, cluster_id, service_name, container_id) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(q, (proxy_id, address, cluster_id, service_name, container_id))
self._close_cursor(cursor)
return proxy_id
def get_clusters(self, user_id=None):
cursor = self._get_cursor(dictionary=True)
res = {}
if user_id is None:
q = "SELECT id, user_id, master_address, name FROM clusters"
cursor.execute(q)
else:
q = "SELECT id, user_id, master_address, name FROM clusters WHERE user_id=%s"
cursor.execute(q, (user_id,))
for row in cursor:
res[str(row["id"])] = { # FIXME: IDs should be int or str, no casting!
"user_id": row["user_id"],
"master_address": row["master_address"],
"name": row["name"]
}
self._close_cursor(cursor)
return res
def get_cluster(self, cluster_id):
cursor = self._get_cursor(dictionary=True)
q = "SELECT * FROM clusters WHERE id=%s"
cursor.execute(q, (cluster_id,))
row = cursor.fetchone()
res = dict(row)
self._close_cursor(cursor)
return res
def get_containers(self, user_id=None, cluster_id=None):
cursor = self._get_cursor(dictionary=True)
res = {}
if user_id is None and cluster_id is None:
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers"
cursor.execute(q)
elif user_id is not None and cluster_id is None:
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers WHERE user_id=%s"
cursor.execute(q, (user_id,))
elif user_id is None and cluster_id is not None:
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,))
elif user_id is not None and cluster_id is not None:
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers WHERE user_id=%s AND cluster_id=%s"
cursor.execute(q, (user_id, cluster_id))
for row in cursor:
res[str(row["id"])] = {
"docker_id": row["docker_id"],
"cluster_id": row["cluster_id"],
"user_id": row["user_id"],
"ip_address": row["ip_address"],
"contents": row["contents"],
}
self._close_cursor(cursor)
return res
def get_container(self, container_id):
cursor = self._get_cursor(dictionary=True)
res = {}
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers WHERE id=%s"
cursor.execute(q, (container_id,))
for row in cursor:
res = {
"id": row["id"],
"docker_id": row["docker_id"],
"cluster_id": row["cluster_id"],
"user_id": row["user_id"],
"ip_address": row["ip_address"],
"contents": row["contents"],
}
self._close_cursor(cursor)
return res
def get_submit_containers(self) -> (int, int):
cursor = self._get_cursor(dictionary=True)
res = []
q = "SELECT id, cluster_id FROM containers WHERE contents='spark-submit'"
cursor.execute(q)
for row in cursor:
res.append((row["id"], row["cluster_id"]))
self._close_cursor(cursor)
return res
def get_proxies(self, cluster_id=None, container_id=None):
cursor = self._get_cursor(dictionary=True)
if cluster_id is None and container_id is None:
q = "SELECT * FROM proxy"
cursor.execute(q)
elif container_id is not None:
q = "SELECT * FROM proxy WHERE container_id=%s"
cursor.execute(q, (container_id,))
else:
q = "SELECT * FROM proxy WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,))
proxy_list = []
for row in cursor:
proxy_list.append(dict(row))
self._close_cursor(cursor)
return proxy_list
def remove_proxy(self, container_id):
cursor = self._get_cursor()
q = "DELETE FROM proxy WHERE container_id=%s"
cursor.execute(q, (container_id,))
self._close_cursor(cursor)
def remove_container(self, container_id):
cursor = self._get_cursor()
q = "DELETE FROM containers WHERE id=%s"
cursor.execute(q, (container_id,))
self._close_cursor(cursor)
def remove_cluster(self, cluster_id):
cursor = self._get_cursor()
q = "DELETE FROM clusters WHERE id=%s"
cursor.execute(q, (cluster_id,))
self._close_cursor(cursor)
def new_application(self, user_id: int, execution_name: str, spark_options: str, commandline: str) -> int:
cursor = self._get_cursor()
q = "INSERT INTO applications (execution_name, cmd, spark_options, user_id) VALUES (%s, %s, %s, %s)"
cursor.execute(q, (execution_name, commandline, spark_options, user_id))
app_id = cursor.lastrowid
self._close_cursor(cursor)
return app_id
def remove_oldest_application(self, user_id) -> str:
cursor = self._get_cursor()
q = "SELECT id FROM applications WHERE user_id=%s AND time_finished IS NOT NULL ORDER BY time_finished ASC LIMIT 1"
cursor.execute(q, (user_id,))
app_id = cursor.fetchone()[0]
q = "DELETE FROM applications WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
return app_id
def application_ready(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET status='ready' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def application_started(self, app_id, cluster_id):
cursor = self._get_cursor()
q = "UPDATE applications SET cluster_id=%s, time_started=CURRENT_TIMESTAMP, status='running' WHERE id=%s"
cursor.execute(q, (cluster_id, app_id))
self._close_cursor(cursor)
def application_killed(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET time_finished=CURRENT_TIMESTAMP, status='killed' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def application_finished(self, app_id):
cursor = self._get_cursor()
q = "UPDATE applications SET time_finished=CURRENT_TIMESTAMP, status='finished' WHERE id=%s"
cursor.execute(q, (app_id,))
self._close_cursor(cursor)
def get_applications(self, user_id=None) -> dict:
cursor = self._get_cursor(dictionary=True)
if user_id is None:
q = "SELECT * FROM applications ORDER BY time_started DESC"
cursor.execute(q)
else:
q = "SELECT * FROM applications WHERE user_id=%s ORDER BY time_started DESC"
cursor.execute(q, (user_id,))
res = []
for row in cursor:
res.append((dict(row)))
self._close_cursor(cursor)
return res
def get_application(self, app_id: int) -> dict:
cursor = self._get_cursor(dictionary=True)
q = "SELECT * FROM applications WHERE id=%s"
cursor.execute(q, (app_id,))
res = dict(cursor.fetchone())
self._close_cursor(cursor)
return res
def find_app_for_cluster(self, cluster_id: int):
cursor = self._get_cursor()
q = "SELECT id FROM applications WHERE cluster_id=%s"
cursor.execute(q, (cluster_id,))
if cursor.rowcount > 0:
res = cursor.fetchone()[0]
else:
res = None
self._close_cursor(cursor)
return res
def update_proxy_access(self, proxy_id, access_ts):
cursor = self._get_cursor()
q = "UPDATE proxy SET last_access=%s WHERE id=%s"
cursor.execute(q, (access_ts, proxy_id))
self._close_cursor(cursor)
def get_old_spark_notebooks(self, older_than):
cursor = self._get_cursor()
q = "SELECT cluster_id FROM proxy WHERE service_name='notebook' AND last_access < %s"
cursor.execute(q, (older_than,))
ret = [row[0] for row in cursor]
self._close_cursor(cursor)
return ret
import time
import os
from docker import Client
from docker import errors as docker_errors
from docker.utils import create_host_config
from zoe_web.cluster_description import SparkClusterDescription
from zoe_web.config_parser import config
from zoe_web.proxy_manager import get_notebook_address
from zoe_web.spark_app_execution import AppHistory
from zoe_web.sql import CAaaState
from zoe_web.utils import get_uuid
REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
WORKER_IMAGE = REGISTRY + "/venza/spark-worker:1.4.1"
SHELL_IMAGE = REGISTRY + "/venza/spark-shell:1.4.1"
SUBMIT_IMAGE = REGISTRY + "/venza/spark-submit:1.4.1"
NOTEBOOK_IMAGE = REGISTRY + "/venza/spark-notebook:1.4.1"
class SwarmStatus:
def __init__(self):
self.num_nodes = 0
self.num_containers = 0
class ContainerOptions:
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
self.memory_limit = '2g'
def add_env_variable(self, name, value):
self.env[name] = value
def get_environment(self):
return self.env
def add_volume_bind(self, path, mountpoint, readonly=False):
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + "ro" if readonly else "rw")
def get_volumes(self):
return self.volumes
def get_volume_binds(self):
return self.volume_binds
def set_command(self, cmd):
self.command = cmd
def get_command(self):
return self.command
def set_memory_limit(self, limit):
self.memory_limit = limit
def get_memory_limit(self):
return self.memory_limit
class SwarmManager:
def __init__(self):
self.status = SwarmStatus()
self.cli = None
self.last_update_timestamp = 0
def connect(self):
manager = config.docker_swarm_manager
self.cli = Client(base_url=manager)
def update_status(self):
assert self.cli is not None
info = self.cli.info()
self.status.num_containers = info["Containers"]
self.status.num_nodes = info["DriverStatus"][3][1]
self.last_update_timestamp = time.time()
def get_notebook(self, user_id):
db = CAaaState()
nb = db.get_notebook(user_id)
if nb is None:
self._start_cluster_with_notebook(user_id)
nb = db.get_notebook(user_id)
return get_notebook_address(nb)
def spark_submit(self, user_id, app_id):
cluster_id = self._start_cluster_for_app(user_id, app_id)
return cluster_id
def _start_cluster_with_notebook(self, user_id):
cluster_descr = SparkClusterDescription()
cluster_descr.for_spark_notebook()
return self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
def _start_cluster_for_app(self, user_id: int, app_id: int):
state = CAaaState()
cluster_descr = SparkClusterDescription()
cluster_descr.for_spark_app(app_id)
cluster_id = self._create_new_spark_cluster(user_id, "spark-application", cluster_descr, app_id=app_id)
state.application_started(app_id, cluster_id)
return cluster_id
def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook=False, app_id=None):
db = CAaaState()
try:
cluster_id = db.new_cluster(user_id, name)
master_info = self._spawn_spark_master(cluster_id, user_id, cluster_descr)
db.set_master_address(cluster_id, master_info["spark_master_address"])
for i in range(cluster_descr.num_workers):
self._spawn_spark_worker(cluster_id, user_id, cluster_descr, master_info, i)
if with_notebook:
self._spawn_spark_notebook(cluster_id, user_id, cluster_descr, master_info)
if app_id is not None:
self._spawn_spark_submit(user_id, cluster_id, app_id, cluster_descr, master_info)
return cluster_id
except docker_errors.APIError as e:
print("Error starting container: " + str(e.explanation))
# FIXME: should rollback all changes to DB
return None
def _spawn_spark_master(self, cluster_id, user_id, cluster_descr):
db = CAaaState()
options = ContainerOptions()
info = self._spawn_container(MASTER_IMAGE, options)
info["spark_master_address"] = "http://" + info["docker_ip"] + ":8080"
cont_id = db.new_container(cluster_id, user_id, info["docker_id"], info["docker_ip"], "spark-master")
db.new_proxy_entry(get_uuid(), cluster_id, info["spark_master_address"], "web interface", cont_id)
return info
def _spawn_spark_worker(self, cluster_id, user_id, cluster_descr, master_info, count):
db = CAaaState()
options = ContainerOptions()
options.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"])
options.add_env_variable("SPARK_WORKER_RAM", cluster_descr.executor_ram_size)
options.add_env_variable("SPARK_WORKER_CORES", cluster_descr.worker_cores)
options.set_memory_limit(cluster_descr.executor_ram_size)
info = self._spawn_container(WORKER_IMAGE, options)
cont_id = db.new_container(cluster_id, user_id, info["docker_id"], info["docker_ip"], "spark-worker-" + str(count))
db.new_proxy_entry(get_uuid(), cluster_id, "http://" + info["docker_ip"] + ":8081", "web interface", cont_id)
return info
def _spawn_spark_notebook(self, cluster_id, user_id, cluster_descr, master_info):
db = CAaaState()
proxy_id = get_uuid()
options = ContainerOptions()
options.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"])