Commit 434ad2d2 authored by Daniele Venzano's avatar Daniele Venzano

Don not pass SqlAlchemy objects through the client API

parent 18071e80
......@@ -11,5 +11,6 @@ conf = {
'proxy_update_accesses': 300,
'check_health': 30,
'notebook_max_age_no_activity': 24,
'notebook_warning_age_no_activity': 2
'notebook_warning_age_no_activity': 2,
'email_task_interval': 300
}
from sqlalchemy import Column, Integer, String, PickleType, Text, ForeignKey
from sqlalchemy import Column, Integer, String, PickleType, ForeignKey
from sqlalchemy.orm import relationship
from common.state import Base
......@@ -41,6 +41,16 @@ class Application(Base):
ret.append(e)
return ret
def extract(self):
ret = PlainApplication()
ret.id = self.id
ret.name = self.name
ret.required_resources = self.required_resources
ret.user_id = self.user_id
ret.executions = [x.id for x in self.executions]
ret.type = self.type
return ret
class SparkApplication(Application):
master_image = Column(String(256))
......@@ -56,6 +66,12 @@ class SparkApplication(Application):
ret["worker_image"] = self.worker_image
return ret
def extract(self):
ret = super().extract()
ret.master_image = self.master_image
ret.worker_image = self.worker_image
return ret
class SparkNotebookApplication(SparkApplication):
notebook_image = Column(String(256))
......@@ -69,6 +85,11 @@ class SparkNotebookApplication(SparkApplication):
ret["notebook_image"] = self.notebook_image
return ret
def extract(self):
ret = super().extract()
ret.notebook_image = self.notebook_image
return ret
class SparkSubmitApplication(SparkApplication):
submit_image = Column(String(256))
......@@ -81,3 +102,12 @@ class SparkSubmitApplication(SparkApplication):
ret = super().to_dict()
ret["submit_image"] = self.submit_image
return ret
def extract(self):
ret = super().extract()
ret.submit_image = self.submit_image
return ret
class PlainApplication:
pass
......@@ -53,6 +53,21 @@ class Execution(Base):
return "<Execution(name='%s', id='%s', assigned_resourced='%s', application_id='%s', )>" % (
self.name, self.id, self.assigned_resources, self.application_id)
def extract(self):
ret = PlainExecution()
ret.id = self.id
ret.name = self.name
ret.assigned_resources = self.assigned_resources
ret.application_id = self.application_id
ret.time_started = self.time_started
ret.time_scheduled = self.time_scheduled
ret.time_finished = self.time_finished
ret.status = self.status
ret.termination_notice = self.termination_notice
ret.cluster_id = self.cluster.id
ret.type = self.type
return ret
class SparkSubmitExecution(Execution):
commandline = Column(String(1024))
......@@ -61,3 +76,12 @@ class SparkSubmitExecution(Execution):
__mapper_args__ = {
'polymorphic_identity': 'spark-submit-application'
}
def extract(self):
ret = super().extract()
ret.commandline = self.commandline
ret.spark_opts = self.spark_opts
class PlainExecution:
pass
......@@ -15,3 +15,13 @@ class User(Base):
def __repr__(self):
return "<User(id='%s', email='%s')>" % (
self.id, self.app_id)
def extract(self):
ret = PlainUser()
ret.id = self.id
ret.email = self.email
return ret
class PlainUser:
pass
......@@ -4,8 +4,9 @@ from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from zoe_web import app
from zoe_web.cleanup_thread import cleanup_task
from zoe_web.config_parser import config
from zoe_web.emails import email_task
from common.configuration import conf
DEBUG = True
log = logging.getLogger("zoe_web")
......@@ -23,7 +24,7 @@ def main():
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(4000, "0.0.0.0")
ioloop = IOLoop.instance()
PeriodicCallback(cleanup_task, int(config.cleanup_thread_interval) * 1000).start()
PeriodicCallback(email_task, int(conf["email_task_interval"]) * 1000).start()
ioloop.start()
......
import rpyc
from sqlalchemy.orm.exc import NoResultFound
from common.state import AlchemySession, SparkApplication, User, Application, Cluster, SparkSubmitExecution, Execution, SparkNotebookApplication, SparkSubmitApplication
from common.state import AlchemySession
from common.state.application import Application, SparkNotebookApplication, SparkSubmitApplication, SparkApplication, PlainApplication
from common.state.container import Container
from common.state.execution import Execution, SparkSubmitExecution, PlainExecution
from common.state.user import User, PlainUser
from common.application_resources import SparkApplicationResources
from common.status import PlatformStatusReport
from common.status import PlatformStatusReport, ApplicationStatusReport
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
import common.object_storage as storage
......@@ -21,20 +25,27 @@ class ZoeClient:
self.server = self.server_connection.root
self.state = AlchemySession()
def user_new(self, email) -> int:
# Users
def user_new(self, email: str) -> PlainUser:
user = User(email=email)
self.state.add(user)
self.state.commit()
return user.id
return user.extract()
def user_get(self, email) -> int:
def user_get(self, email: str) -> PlainUser:
user = self.state.query(User).filter_by(email=email).one()
return user.id
return user.extract()
def user_check(self, user_id: int) -> bool:
user = self.state.query(User).filter_by(id=user_id).one()
return user is not None
# Platform
def platform_status(self) -> PlatformStatusReport:
return self.server.get_platform_status()
def spark_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> SparkApplication:
# Applications
def spark_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
......@@ -52,9 +63,9 @@ class ZoeClient:
user_id=user_id)
self.state.add(app)
self.state.commit()
return app
return app.id
def spark_notebook_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str):
def spark_notebook_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
......@@ -73,9 +84,9 @@ class ZoeClient:
user_id=user_id)
self.state.add(app)
self.state.commit()
return app
return app.id
def spark_submit_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file: str) -> SparkSubmitApplication:
def spark_submit_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file: str) -> int:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
......@@ -97,32 +108,20 @@ class ZoeClient:
storage.application_data_upload(app, open(file, "rb").read())
self.state.commit()
return app
return app.id
def spark_application_get(self, application_id) -> Application:
def application_get(self, application_id) -> PlainApplication:
try:
return self.state.query(SparkApplication).filter_by(id=application_id).one()
ret = self.state.query(Application).filter_by(id=application_id).one()
return ret.extract()
except NoResultFound:
return None
def execution_spark_new(self, application: Application, name, commandline=None, spark_options=None):
if type(application) is SparkSubmitApplication:
if commandline is None:
raise ValueError("Spark submit application requires a commandline")
execution = SparkSubmitExecution(name=name,
application_id=application.id,
status="submitted",
commandline=commandline,
spark_opts=spark_options)
else:
execution = Execution(name=name,
application_id=application.id,
status="submitted")
self.state.add(execution)
self.state.commit()
return self.server.execution_schedule(execution.id)
def application_remove(self, application: Application):
def application_remove(self, application_id: int):
try:
application = self.state.query(Application).filter_by(id=application_id).one()
except NoResultFound:
return
running = self.state.query(Execution).filter_by(application_id=application.id, time_finished=None).count()
if running > 0:
raise ApplicationStillRunning(application)
......@@ -134,28 +133,75 @@ class ZoeClient:
self.state.delete(application)
self.state.commit()
def application_status(self, application: Application):
def application_status(self, application_id: int) -> ApplicationStatusReport:
try:
application = self.state.query(Application).filter_by(id=application_id).one()
except NoResultFound:
return None
return self.server.application_status(application.id)
def spark_application_list(self, user_id):
def spark_application_list(self, user_id) -> [PlainApplication]:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
return self.state.query(Application).filter_by(user_id=user_id).all()
apps = self.state.query(Application).filter_by(user_id=user_id).all()
return [x.extract() for x in apps]
# Executions
def execution_spark_new(self, application_id: int, name, commandline=None, spark_options=None) -> bool:
try:
application = self.state.query(Application).filter_by(id=application_id).one()
except NoResultFound:
return None
if type(application) is SparkSubmitApplication:
if commandline is None:
raise ValueError("Spark submit application requires a commandline")
execution = SparkSubmitExecution(name=name,
application_id=application.id,
status="submitted",
commandline=commandline,
spark_opts=spark_options)
else:
execution = Execution(name=name,
application_id=application.id,
status="submitted")
self.state.add(execution)
self.state.commit()
return self.server.execution_schedule(execution.id)
def execution_get(self, execution_id: int) -> Execution:
return self.state.query(Execution).filter_by(id=execution_id).one()
def execution_get(self, execution_id: int) -> PlainExecution:
try:
ret = self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound:
return None
return ret.extract()
def execution_terminate(self, execution: Execution):
self.server.terminate_execution(execution.id)
def execution_terminate(self, execution_id: int):
try:
self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound:
pass
self.server.terminate_execution(execution_id)
def execution_delete(self, execution_id: int):
try:
execution = self.state.query(Execution).filter_by(id=execution_id).one()
except NoResultFound:
return
def execution_delete(self, execution: Execution):
if execution.status == "running":
raise ApplicationStillRunning(execution.application)
storage.logs_archive_delete(execution)
self.state.delete(execution)
# Logs
def log_get(self, container_id: int) -> str:
return self.server.log_get(container_id)
try:
self.state.query(Container).filter_by(id=container_id).one()
except NoResultFound:
return None
else:
return self.server.log_get(container_id)
from flask import jsonify, request, send_file, abort
import time
from zipfile import is_zipfile
from zoe_web import app
......@@ -10,26 +9,6 @@ from zoe_web.swarm_manager import sm
STATS_CACHING_EXPIRATION = 1 # seconds
@app.route("/api/login/<email>")
def api_login(email):
state = CAaaState()
user_id = state.get_user_id(email)
if user_id is None:
user_id = state.new_user(email)
return jsonify(user_id=user_id)
@app.route("/api/status")
def api_status():
if time.time() - sm.last_update_timestamp > STATS_CACHING_EXPIRATION:
sm.update_status()
data = {
'num_containers': int(sm.status.num_containers),
'num_nodes': int(sm.status.num_nodes)
}
return jsonify(**data)
@app.route("/api/<int:user_id>/cluster/<int:cluster_id>/terminate")
def api_terminate_cluster(user_id, cluster_id):
db = CAaaState()
......
from configparser import ConfigParser
import os
MAIN_PATH = os.path.split(os.path.abspath(os.path.join(__file__, "..")))[0]
class CAaasConfig:
def __init__(self, conf_file):
parser = ConfigParser()
found = parser.read(conf_file)
if not found:
raise ValueError('Configuration file not found')
self.__dict__.update(parser.items('zoe_web'))
config = CAaasConfig(os.path.join(MAIN_PATH, 'zoe_web.ini'))
......@@ -8,10 +8,7 @@ log = logging.getLogger(__name__)
from jinja2 import Template
from zoe_web.config_parser import config
from zoe_web.proxy_manager import get_notebook_address
from zoe_web.sql import CAaaState
from zoe_web.swarm_manager import sm
APP_FINISH_EMAIL_TEMPLATE = """Application {{ name }} has finished executing after {{ runtime }}.
......@@ -52,7 +49,7 @@ def do_duration(seconds):
return template.format(d=d, h=h, m=m, s=s)
def cleanup_task():
def email_task():
ts = time.time()
# noinspection PyBroadException
try:
......
import swiftclient
from zoe_web.config_parser import config
class SwiftObjectStore:
def __init__(self):
self.log_container = config.swift_log_container_name
self.app_container = config.swift_app_container_name
self.username = config.swift_username
self.password = config.swift_password
self.tenant_name = config.swift_tenant_name
self.auth_url = config.swift_keystone_auth_url
def _connect(self) -> swiftclient.Connection:
return swiftclient.client.Connection(auth_version='2',
user=self.username,
key=self.password,
tenant_name=self.tenant_name,
authurl=self.auth_url)
def put_log(self, execution_id, log_data):
swift = self._connect()
swift.put_object(self.log_container, execution_id + ".zip", log_data)
swift.close()
def get_log(self, execution_id):
swift = self._connect()
log_data = swift.get_object(self.log_container, execution_id + ".zip")
swift.close()
return log_data
def delete_log(self, execution_id):
swift = self._connect()
swift.delete_object(self.log_container, execution_id + ".zip")
swift.close()
def put_app(self, app_id, app_data):
swift = self._connect()
swift.put_object(self.app_container, app_id + ".zip", app_data)
swift.close()
def get_app(self, app_id):
swift = self._connect()
app_data = swift.get_object(self.app_container, app_id + ".zip")
swift.close()
return app_data
def delete_app(self, app_id):
swift = self._connect()
swift.delete_object(self.app_container, app_id + ".zip")
swift.close()
from flask import render_template, redirect, url_for, abort
from zoe_web import app
from zoe_web.config_parser import config
from zoe_web.proxy_manager import get_container_addresses, get_notebook_address
from zoe_web.sql import CAaaState
from zoe_web.swarm_manager import sm
from zoe_client import ZoeClient
@app.route("/web/")
......@@ -14,45 +11,42 @@ def index():
@app.route("/web/status")
def web_status():
status = sm.swarm_status()
return render_template('status.html', **status)
client = ZoeClient()
status = client.platform_status()
return render_template('status.html', status=status)
@app.route("/web/login/<email>")
def web_login(email):
state = CAaaState()
user_id = state.get_user_id(email)
client = ZoeClient()
user_id = client.user_get(email)
if user_id is None:
user_id = state.new_user(email)
user_id = client.user_new(email)
return redirect(url_for("web_index", user_id=user_id))
@app.route("/web/<int:user_id>")
def web_index(user_id):
state = CAaaState()
if not state.check_user_id(user_id):
client = ZoeClient()
if not client.user_check(user_id):
return redirect(url_for('index'))
template_vars = {
"user_id": user_id,
"email": state.get_user_email(user_id)
"email": client.user_get_email(user_id)
}
return render_template('home.html', **template_vars)
@app.route("/web/<int:user_id>/apps")
def web_user_apps(user_id):
state = CAaaState()
if not state.check_user_id(user_id):
client = ZoeClient()
if not client.user_check(user_id):
return redirect(url_for('index'))
apps = state.get_applications(user_id)
nb_id = state.get_notebook(user_id)
apps = client.spark_application_list(user_id)
template_vars = {
"user_id": user_id,
"apps": apps,
"has_notebook": state.has_notebook(user_id),
"notebook_address": get_notebook_address(nb_id),
"notebook_cluster_id": nb_id
"apps": apps
}
return render_template('apps.html', **template_vars)
......
......@@ -18,26 +18,26 @@ def setup_db_cmd(_):
def user_new_cmd(args):
client = ZoeClient()
clid = client.user_new(args.email)
print("New user ID: {}".format(clid))
user = client.user_new(args.email)
print("New user ID: {}".format(user.id))
def user_get_cmd(args):
client = ZoeClient()
clid = client.user_get(args.email)
print("User ID: {}".format(clid))
user = client.user_get(args.email)
print("User ID: {}".format(user.email))
def spark_cluster_new_cmd(args):
client = ZoeClient()
application = client.spark_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
print("Spark application added with ID: {}".format(application.id))
application_id = client.spark_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
print("Spark application added with ID: {}".format(application_id))
def spark_notebook_new_cmd(args):
client = ZoeClient()
application = client.spark_notebook_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
print("Spark application added with ID: {}".format(application.id))
application_id = client.spark_notebook_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
print("Spark application added with ID: {}".format(application_id))
def spark_submit_new_cmd(args):
......@@ -45,17 +45,17 @@ def spark_submit_new_cmd(args):
print("Error: the file specified is not a zip archive")
return
client = ZoeClient()
application = client.spark_submit_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name, args.file)
print("Spark application added with ID: {}".format(application.id))
application_id = client.spark_submit_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name, args.file)
print("Spark application added with ID: {}".format(application_id))
def run_spark_cmd(args):
client = ZoeClient()
application = client.spark_application_get(args.id)
application = client.application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
ret = client.execution_spark_new(application, args.name, args.cmd, args.spark_opts)
ret = client.execution_spark_new(application.id, args.name, args.cmd, args.spark_opts)
if ret:
print("Application scheduled successfully, use the app-inspect command to check its status")
......@@ -65,28 +65,28 @@ def run_spark_cmd(args):
def app_rm_cmd(args):
client = ZoeClient()
application = client.spark_application_get(args.id)
application = client.application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
if args.force:
a = client.application_status(application)
for eid in a["executions"]:
a = client.application_get(application.id)
for eid in a.executions:
e = client.execution_get(eid)
if e.status == "running":
print("Terminating execution {}".format(e.name))
client.execution_terminate(e)
client.application_remove(application)
client.application_remove(application.id)
def app_inspect_cmd(args):
client = ZoeClient()
application = client.spark_application_get(args.id)
application = client.application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
app_report = client.application_status(application)
app_report = client.application_status(application.id)
print(app_report)
......@@ -105,7 +105,7 @@ def exec_kill_cmd(args):
if execution is None:
print("Error: execution {} does not exist".format(args.id))
return
client.execution_terminate(execution)
client.execution_terminate(execution.id)
def log_get_cmd(args):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment