Commit c1797d28 authored by Daniele Venzano's avatar Daniele Venzano

Spark submit applications now get executed

parent 15460708
conf = {
'docker_swarm_manager': 'tcp://m2:2380',
'docker_swarm_manager': 'tcp://bf1.bigfoot.eurecom.fr:2380',
'status_refresh_interval': 10,
'scheduler_task_interval': 10,
'db_connection': 'mysql+mysqlconnector://zoe:6sz2tfPuzBcCLdEz@m1.bigfoot.eurecom.fr/zoe',
......
......@@ -82,6 +82,7 @@ class SparkSubmitExecution(Execution):
ret = super().extract()
ret.commandline = self.commandline
ret.spark_opts = self.spark_opts
return ret
class PlainExecution:
......
......@@ -30,6 +30,8 @@ class ZoeClient:
self.server_connection = None
def _connect(self):
if self.server is not None:
return # already connected
if self.rpyc_server is None:
self.server_connection = rpyc.connect_by_service("ZoeSchedulerRPC")
else:
......@@ -114,7 +116,7 @@ class ZoeClient:
self.state.commit()
return app.id
def spark_submit_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file: str) -> int:
def spark_submit_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file_data: bytes) -> int:
try:
self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
......@@ -133,7 +135,7 @@ class ZoeClient:
user_id=user_id)
self.state.add(app)
self.state.flush()
storage.application_data_upload(app, open(file, "rb").read())
storage.application_data_upload(app, file_data)
self.state.commit()
return app.id
......
......@@ -55,7 +55,9 @@ def application_new():
file_data = request.files['file']
if not is_zipfile(file_data.stream):
return jsonify(status='error', msg='not a zip file')
client.spark_submit_application_new(user.id, int(form_data["num_workers"]), form_data["ram"] + 'g', int(form_data["num_cores"]), form_data["app_name"], file_data)
file_data.stream.seek(0)
fcontents = file_data.stream.read()
client.spark_submit_application_new(user.id, int(form_data["num_workers"]), form_data["ram"] + 'g', int(form_data["num_cores"]), form_data["app_name"], fcontents)
else:
return jsonify(status="error", msg='unknown application type')
......
......@@ -20,12 +20,15 @@ def home():
"email": user.email,
'apps': apps,
}
reports = [client.application_status(app.id) for app in apps]
reports = []
for app in apps:
r = client.application_status(app.id)
reports.append(r)
active_executions = []
past_executions = []
for r in reports:
for e in r.report['executions']:
if e['status'] == "running" or e['status'] == "scheduled" or e['status'] == "submitted":
if e['status'] == "running": # or e['status'] == "scheduled" or e['status'] == "submitted":
active_executions.append((r.report, e, client.execution_get_proxy_path(e['id'])))
else:
past_executions.append((r.report, e))
......
This diff is collapsed.
......@@ -5,6 +5,7 @@
<meta charset="UTF-8">
<title>{% block title %}{% endblock %} - Zoe</title>
<script src="/static/jquery-2.1.4.min.js" type="application/javascript"></script>
<script src="/static/sorttable.js" type="application/javascript"></script>
<link rel="stylesheet" href="/static/zoe.css" type="text/css">
{% endblock %}
</head>
......
......@@ -17,15 +17,15 @@
<input type="hidden" name="app_id" value="{{ app.id }}">
<label>Set a name to identify this execution:
<input type="text" autofocus autocomplete="on" required pattern="[a-z0-9_\-]+" name="exec_name" id="exec_name" placeholder="wordcount-twitter-2">
</label>
</label><br/>
<label>Command line:
<input type="text" autofocus autocomplete="on" required name="cmdline" id="cmdline" placeholder="wordcount.py hdfs://192.168.45.157/datasets/gutenberg_big_2x.txt hdfs://192.168.45.157/tmp/cntwdc1">
</label>
<input type="text" autocomplete="on" required name="commandline" id="commandline" size="100" placeholder="wordcount.py hdfs://192.168.45.157/datasets/gutenberg_big_2x.txt hdfs://192.168.45.157/tmp/cntwdc1">
</label><br/>
<label>Spark options (optional):
<input type="text" autofocus autocomplete="on" name="spark_opts" id="spark_opts" placeholder="">
</label>
<input type="text" autocomplete="on" name="spark_opts" id="spark_opts" size="100"><br/>
</label><br/>
<input type="submit" value="Start" id="submit">
</form>
{% endif %}
......@@ -46,7 +46,7 @@
return false;
}
function errorHandler(e) {
function errorHandler() {
$("#communication_error").show();
}
......
......@@ -9,7 +9,7 @@
<p>You have no applications defined. <a href="{{ url_for("web.application_new") }}">Click here</a> to create a new one!</p>
{% else %}
<table id="app_list" class="app_list">
<table id="app_list" class="app_list sortable">
<thead>
<tr>
<th>Name</th>
......@@ -44,7 +44,7 @@
<p>You have no active executions at this time.</p>
{% else %}
<table id="exec_list" class="app_list">
<table id="exec_list" class="app_list sortable">
<thead>
<tr>
<th>Application name</th>
......@@ -83,7 +83,7 @@
<p>Empty history.</p>
{% else %}
<table id="hist_list" class="app_list">
<table id="hist_list" class="app_list sortable">
<thead>
<tr>
<th>Application name</th>
......
......@@ -49,8 +49,9 @@ def spark_submit_new_cmd(args):
if not is_zipfile(args.file):
print("Error: the file specified is not a zip archive")
return
fcontents = open(args.file, "rb").read()
client = get_zoe_client()
application_id = client.spark_submit_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name, args.file)
application_id = client.spark_submit_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name, fcontents)
print("Spark application added with ID: {}".format(application_id))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment