Commit 51327582 authored by Daniele Venzano's avatar Daniele Venzano

Generalize spark core and memory settings

parent 01729f15
This diff is collapsed.
class SparkClusterDescription:
def __init__(self):
self.num_workers = 2
self.executor_ram_size = "4g"
self.worker_cores = "2"
......@@ -6,6 +6,7 @@ import time
from uuid import uuid4 as uuid
from caaas import get_db
from caaas.cluster_description import SparkClusterDescription
REGISTRY = "10.0.0.2:5000"
......@@ -62,24 +63,27 @@ class Swarm:
return nb["address"]
def start_cluster_with_notebook(self, user_id):
num_workers = 2
self._create_new_spark_cluster(user_id, "notebook", num_workers, with_notebook=True)
cluster_descr = SparkClusterDescription()
cluster_descr.num_workers = 2
cluster_descr.worker_cores = "2"
cluster_descr.executor_ram_size = "4g"
self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
def _create_new_spark_cluster(self, user_id, name, num_workers, with_notebook):
def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook):
db = get_db()
try:
cluster_id = db.new_cluster(user_id, name)
master_info = self._spawn_spark_master(cluster_id, user_id)
master_info = self._spawn_spark_master(cluster_id, user_id, cluster_descr)
db.set_master_address(cluster_id, master_info["spark_master_address"])
for i in range(num_workers):
self._spawn_spark_worker(cluster_id, user_id, master_info)
for i in range(cluster_descr.num_workers):
self._spawn_spark_worker(cluster_id, user_id, cluster_descr, master_info)
if with_notebook:
self._spawn_spark_notebook(cluster_id, user_id, master_info)
self._spawn_spark_notebook(cluster_id, user_id, cluster_descr, master_info)
except docker_errors.APIError as e:
print("Error starting container: " + str(e.explanation))
# FIXME: should rollback all changes to DB
def _spawn_spark_master(self, cluster_id, user_id):
def _spawn_spark_master(self, cluster_id, user_id, cluster_descr):
db = get_db()
options = {
"environment": {},
......@@ -90,13 +94,13 @@ class Swarm:
db.new_proxy_entry(get_uuid(), cluster_id, info["spark_master_address"], "spark-master", cont_id)
return info
def _spawn_spark_worker(self, cluster_id, user_id, master_info):
def _spawn_spark_worker(self, cluster_id, user_id, cluster_descr, master_info):
db = get_db()
options = {
"environment": {
"SPARK_MASTER_IP": master_info["docker_ip"],
"SPARK_WORKER_RAM": "4g",
"SPARK_WORKER_CORES": "2"
"SPARK_WORKER_RAM": cluster_descr.executor_ram_size,
"SPARK_WORKER_CORES": cluster_descr.worker_cores
},
}
info = self._spawn_container(WORKER_IMAGE, options)
......@@ -104,12 +108,13 @@ class Swarm:
db.new_proxy_entry(get_uuid(), cluster_id, "http://" + info["docker_ip"] + ":8081", "spark-worker", cont_id)
return info
def _spawn_spark_notebook(self, cluster_id, user_id, master_info):
def _spawn_spark_notebook(self, cluster_id, user_id, cluster_descr, master_info):
db = get_db()
proxy_id = get_uuid()
options = {
"environment": {
"SPARK_MASTER_IP": master_info["docker_ip"],
"SPARK_EXECUTOR_RAM": cluster_descr.executor_ram_size,
"PROXY_ID": proxy_id
},
}
......
......@@ -11,6 +11,7 @@ manager {
custom {
sparkConf = {
spark.master: "spark://SPARK_MASTER_IP:7077"
spark.executor.memory: SPARK_EXEC_RAM
}
}
}
......
......@@ -2,6 +2,6 @@
cd /opt/spark-notebook
cat ../application.conf | sed -e "s/SPARK_MASTER_IP/$SPARK_MASTER_IP/" > conf/application.conf
cat ../application.conf | sed -e "s/SPARK_MASTER_IP/$SPARK_MASTER_IP/" -e "s/SPARK_EXEC_RAM/$SPARK_EXECUTOR_RAM/" > conf/application.conf
./bin/spark-notebook -Dconfig.file=./conf/application.conf -Dapplication.context=/proxy/$PROXY_ID
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment