Commit 45abd953 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Use an application description object instead of hardcoding spark into the scheduler

 This commit also removes completely the reverse proxy code, as the same result can be achieved with a socks proxy and much less complexity in Zoe's code.
  Everything that is extraneous from the scheduler is being moved away.
  In this commit zoe_web is broken.
parent c606dded
Planned features for Zoe
========================
Extract Spark from the code
---------------------------
Zoe should be independant from Spark and support many data analytics frameworks. Currently Spark is pretty hardcoded, but we should move all application-specific
details into an "application description". This description is fed to the Zoe Scheduler, that becames a generic application scheduler.
Monitoring
----------
Integrate a monitoring solution: Zoe has access to a lot of valuable data that should be recorded and used for feedback and study. Tha data that can be gathered is of two kinds:
1. Events (users starts an execution, cluster finishes, etc.)
2. Statistics: timeseries data gathered from `docker stats`, from the docker hosts (collectd? influxdb?)
Data should be visible by the users. The difficulty of using Grafana for visulatization is that it does not handle well showing graphs from different
time intervals, for example to comapre the executions of two Spark jobs.
Data should be visible by the users. The difficulty of using Grafana for visualization is that it does not handle well showing graphs from different
time intervals, for example to compare the executions of two Spark jobs.
Storage
-------
Zoe should support creating, listing and selecting inputs and outputs for applications. In particular users should be able to create new HDFS clusters or re-use exsting
Zoe should support creating, listing and selecting inputs and outputs for applications. In particular users should be able to create new HDFS clusters or re-use existing
ones, created by them ot by other users. They should be able to list the contents of these storage cluster and select inputs and outputs.
Zoe Scheduler should try to place containers trying to satisfy data-locality constraints, keeping the data containers and the compute containers "near".
......
class ApplicationResources:
def core_count(self):
return 0
def to_dict(self) -> dict:
return {}
# For now resources are dictionaries and Platform recognizes:
# - memory_limit
# - worker_cores
class SparkApplicationResources(ApplicationResources):
def __init__(self):
self.master_resources = {}
self.worker_resources = {}
self.notebook_resources = {}
self.client_resources = {}
self.worker_count = 0
self.container_count = 0
def core_count(self) -> int:
if "cores" in self.worker_resources:
return self.worker_count * self.worker_resources["cores"]
else:
return 0
def to_dict(self) -> dict:
ret = super().to_dict()
ret['master_resources'] = self.master_resources
ret['worker_resources'] = self.worker_resources
ret['notebook_resources'] = self.notebook_resources
ret['client_resources'] = self.client_resources
ret['worker_count'] = self.worker_count
ret['container_count'] = self.container_count
return ret
......@@ -18,7 +18,6 @@ defaults = {
'intervals': {
'status_refresh': 10,
'scheduler_task': 10,
'proxy_update_accesses': 300,
'check_health': 30,
'notebook_max_age_no_activity': 24,
'notebook_warning_age_no_activity': 2
......@@ -26,12 +25,6 @@ defaults = {
'db': {
'url': 'mysql+mysqlconnector://zoe:pass@dbhost/zoe'
},
'apache': {
'proxy_config_file': '/tmp/zoe-proxy.conf',
'access_log': '/var/log/apache2/access.log',
'web_server_name': 'bigfoot-m2.eurecom.fr',
'proxy_path_prefix': '/proxy'
},
'smtp': {
'server': 'smtp.exmaple.com',
'user': 'zoe@exmaple.com',
......@@ -42,6 +35,10 @@ defaults = {
},
'flask': {
'secret_key': b"\xc3\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!"
},
'network': {
'scheduler_internal_server_port': 4390,
'scheduler_internal_hostname': '127.0.0.1'
}
}
......@@ -65,10 +62,6 @@ class ZoeConfig(ConfigParser):
def web_server_name(self) -> str:
return self.get('apache', 'web_server_name')
@property
def proxy_path_url_prefix(self) -> str:
return self.get('apache', 'proxy_path_prefix')
@property
def smtp_server(self) -> str:
return self.get('smtp', 'server')
......@@ -93,18 +86,6 @@ class ZoeConfig(ConfigParser):
def interval_check_health(self) -> int:
return self.getint('intervals', 'check_health')
@property
def interval_proxy_update_accesses(self) -> int:
return self.getint('intervals', 'proxy_update_accesses')
@property
def apache_log_file(self) -> str:
return self.get('apache', 'access_log')
@property
def apache_proxy_config_file(self) -> str:
return self.get('apache', 'proxy_config_file')
@property
def db_url(self) -> str:
return self.get('db', 'url')
......@@ -129,6 +110,14 @@ class ZoeConfig(ConfigParser):
def docker_private_registry(self) -> str:
return self.get('docker', 'private_registry')
@property
def scheduler_internal_server_port(self) -> int:
return self.getint('network', 'scheduler_internal_server_port')
@property
def scheduler_internal_hostname(self) -> str:
return self.get('network', 'scheduler_internal_hostname')
def init(config_file=None) -> ZoeConfig:
global _zoeconf
......
__version__ = '0.8.2'
__version__ = '0.8.90'
......@@ -6,8 +6,7 @@ Requirements
* MySQL to keep all the state
* Docker Swarm
* A Docker registry containing Spark images
* Apache Web Server to act as a reverse proxy
* A Docker registry containing Zoe images
How to install
--------------
......@@ -69,22 +68,3 @@ bypassing the Hub.
The images are quite standard and can be used also without Zoe. Examples on how to do that, are available in the ``scripts/start_cluster.sh`` script.
Set the registry address:port in section ``[docker]`` in ``zoe.conf``. If use Docker Hub, set the option to an empty string.
Apache Web Server configuration
-------------------------------
Install the Apache web server.
A sample virtual host file containing the directives required by Zoe is available in ``scripts/apache-sample.conf``.
This configuration will also proxy zoe-web, that starts on port 5000 by default.
Please note that putting the generated config file in /tmp can be a serious security problem, depending on your setup.
Zoe generates dynamically proxy entries to let users access to the various web interfaces contained in the Spark containers.
To do this, it needs to be able to reload Apache and to write to a configuration file included in the VirtualHost directive.
Zoe is executing ``sudo service apache2 reload`` whenever nedded, so make sure the user that runs Zoe is able to run that command
successfully.
Change as needed the options ``web_server_name``, ``access_log`` and ``proxy_config_file`` in the section ``[apache]`` of ``zoe.conf``.
......@@ -2,6 +2,7 @@
P3=`which python3`
$P3 ./zoectl.py user-new venzano@eurecom.fr
$P3 ./zoectl.py spark-notebook-new --user-id 1 --name "small notebook" --worker-count 2 --executor-memory 2g --executor-cores 2
$P3 ./zoectl.py spark-app-new --user-id 1 --name "wordcount medium" --worker-count 8 --executor-memory 8g --executor-cores 8 --file ../wordcount.zip
$P3 ./zoe.py user-new venzano@eurecom.fr
$P3 ./zoe.py app-new --user-id 1 tests/resources/spark-notebook-test.json
$P3 ./zoe.py start 1
<VirtualHost *:80>
ServerAdmin webmaster@localhost
DocumentRoot /var/www/html
ErrorLog ${APACHE_LOG_DIR}/error.log
CustomLog ${APACHE_LOG_DIR}/access.log combined
ProxyHTMLLinks a href
ProxyHTMLLinks area href
ProxyHTMLLinks link href
ProxyHTMLLinks img src longdesc usemap
ProxyHTMLLinks object classid codebase data usemap
ProxyHTMLLinks q cite
ProxyHTMLLinks blockquote cite
ProxyHTMLLinks ins cite
ProxyHTMLLinks del cite
ProxyHTMLLinks form action
ProxyHTMLLinks input src usemap
ProxyHTMLLinks head profile
ProxyHTMLLinks base href
ProxyHTMLLinks script src for
ProxyHTMLEvents onclick ondblclick onmousedown onmouseup \
onmouseover onmousemove onmouseout onkeypress \
onkeydown onkeyup onfocus onblur onload \
onunload onsubmit onreset onselect onchange
ProxyRequests Off
<Location />
ProxyHtmlEnable On
ProxyHTMLExtended On
ProxyPass http://127.0.0.1:5000/ retry=0
ProxyPassReverse http://127.0.0.1:5000/
</Location>
IncludeOptional /tmp/zoe-proxy.conf
</VirtualHost>
#!/bin/sh
SWARM_MANAGER=10.0.0.2:2380
REGISTRY=10.0.0.2:5000
MASTER_IMAGE=$REGISTRY/venza/spark-master:1.4.1
WORKER_IMAGE=$REGISTRY/venza/spark-worker:1.4.1
SHELL_IMAGE=$REGISTRY/venza/spark-shell:1.4.1
SUBMIT_IMAGE=$REGISTRY/venza/spark-submit:1.4.1
WORKER_COUNT=3
WORKER_RAM=8g
WORKER_CORES=4
MASTER_ID=`docker -H $SWARM_MANAGER run -d $MASTER_IMAGE`
MASTER_IP=`docker -H $SWARM_MANAGER inspect --format '{{ .NetworkSettings.IPAddress }}' $MASTER_ID`
echo "Spark master is at $MASTER_IP"
for w in `seq $WORKER_COUNT`; do
docker -H $SWARM_MANAGER run -e SPARK_MASTER_IP=$MASTER_IP -e SPARK_WORKER_RAM=$WORKER_RAM -e SPARK_WORKER_CORES=$WORKER_CORES -d $WORKER_IMAGE
done
if [ "$1" == "--shell" ]; then
docker -H $SWARM_MANAGER run -i -t -e SPARK_MASTER_IP=$MASTER_IP -e SPARK_EXECUTOR_RAM=$WORKER_RAM $SHELL_IMAGE
fi
if [ "$1" == "--submit" ]; then
docker -H $SWARM_MANAGER run --rm -i -t -e SPARK_MASTER_IP=$MASTER_IP -e SPARK_EXECUTOR_RAM=$WORKER_RAM -v /mnt/cephfs/temp/spark-apps:/apps $SUBMIT_IMAGE /opt/submit.sh /apps/wordcount.py hdfs://192.168.45.157/datasets/gutenberg_big_2x.txt hdfs://192.168.45.157/tmp/cntwdc1
fi
import json
import pytest
from common.application_resources import SparkApplicationResources
from zoe_scheduler.state import init as state_init, Base, AlchemySession
from zoe_scheduler.state.application import SparkSubmitApplicationState
from zoe_scheduler.state import UserState
from zoe_scheduler.state.application import ApplicationState
from zoe_scheduler.application_description import ZoeApplication
from common.configuration import init as conf_init, zoeconf
......@@ -57,18 +56,18 @@ def state_session(state_connection, request):
@pytest.fixture(scope='function')
def application(state_session):
user = UserState()
user.email = 'a@b.c'
app = SparkSubmitApplicationState()
app.submit_image = "test"
app.worker_image = "test"
app.master_image = "test"
app.name = "testapp"
app.user = user
app.required_resources = SparkApplicationResources()
def application(state_session, notebook_test):
app = ApplicationState()
app.user_id = 1
app.description = ZoeApplication.from_dict(notebook_test)
state_session.add(app)
state_session.flush()
return app
@pytest.fixture(scope='session')
def notebook_test():
jsondata = open("tests/resources/spark-notebook-test.json", "r")
dictdata = json.load(jsondata)
return dictdata
{
"name": "Spark notebook test",
"version": 0,
"will_end": false,
"priority": 512,
"requires_binary": false,
"processes": [
{
"name": "spark-master",
"version": 0,
"docker_image": "/zoerepo/spark-master",
"monitor": false,
"required_resources": {
"memory": 2147483648
},
"ports": [
[
"8080",
"Spark master web interface",
false
]
]
},
{
"name": "spark-worker-1",
"version": 0,
"docker_image": "/zoerepo/spark-worker",
"monitor": false,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
],
"environment": [
["SPARK_WORKER_CORES", "4"],
["SPARK_WORKER_RAM", "4g"],
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"]
]
},
{
"name": "spark-worker-2",
"version": 0,
"docker_image": "/zoerepo/spark-worker",
"monitor": false,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
],
"environment": [
["SPARK_WORKER_CORES", "4"],
["SPARK_WORKER_RAM", "4g"],
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"]
]
},
{
"name": "spark-notebook",
"version": 0,
"docker_image": "/zoerepo/spark-scala-notebook",
"monitor": true,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"4040",
"Spark application web interface",
false
],
[
"9000",
"Spark Notebook interface",
true
]
],
"environment": [
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"],
["SPARK_OPTIONS", ""],
["SPARK_EXECUTOR_RAM", "4g"]
]
}
]
}
{
"name": "Spark wordcount test",
"version": 0,
"will_end": true,
"priority": 512,
"requires_binary": true,
"processes": [
{
"name": "spark-master",
"version": 0,
"docker_image": "/zoerepo/spark-master",
"monitor": false,
"required_resources": {
"memory": 2147483648
},
"ports": [
[
"8080",
"Spark master web interface",
false
]
]
},
{
"name": "spark-worker-1",
"version": 0,
"docker_image": "/zoerepo/spark-worker",
"monitor": false,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
],
"environment": [
["SPARK_WORKER_CORES", "4"],
["SPARK_WORKER_RAM", "4g"],
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"]
]
},
{
"name": "spark-worker-2",
"version": 0,
"docker_image": "/zoerepo/spark-worker",
"monitor": false,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
],
"environment": [
["SPARK_WORKER_CORES", "4"],
["SPARK_WORKER_RAM", "4g"],
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"]
]
},
{
"name": "spark-submit",
"version": 0,
"docker_image": "/zoerepo/spark-submit",
"monitor": true,
"required_resources": {
"memory": 4294967296
},
"ports": [
[
"4040",
"Spark application web interface",
true
]
],
"environment": [
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"],
["SPARK_OPTIONS", ""],
["SPARK_EXECUTOR_RAM", "4g"],
["APPLICATION_URL", "{application_binary_url}"]
],
"command": "/opt/submit.sh wordcount.py hdfs://192.168.45.157/datasets/gutenberg_big_2x.txt hdfs://192.168.45.157/tmp/cntwdc1"
}
]
}
[docker]
swarm_manager_url = tcp://example.com:2380
private_registry = 10.1.0.1:5000
[intervals]
check_health = 30
notebook_max_age_no_activity = 24
scheduler_task = 10
notebook_warning_age_no_activity = 2
proxy_update_accesses = 10
status_refresh = 10
[filesystem]
......@@ -20,11 +20,9 @@ password = none
user = none
server = none
[apache]
web_server_name = www.example.com
access_log = /var/log/apache2/access.log
proxy_config_file = /tmp/zoe-proxy.conf
proxy_path_prefix = /proxy
[db]
url = mysql+mysqlconnector://root@localhost/zoe
[network]
scheduler_internal_server_port = 4390
scheduler_internal_hostname = 192.168.45.25
......@@ -6,7 +6,6 @@ check_health = 30
notebook_max_age_no_activity = 24
scheduler_task = 10
notebook_warning_age_no_activity = 2
proxy_update_accesses = 10
status_refresh = 10
[filesystem]
......@@ -20,11 +19,5 @@ password = none
user = none
server = none
[apache]
web_server_name = www.example.com
access_log = /var/log/apache2/access.log
proxy_config_file = /tmp/zoe-proxy.conf
proxy_path_prefix = /proxy
[db]
url = mysql+mysqlconnector://root@localhost/zoe
import base64
import logging
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.state import AlchemySession
from zoe_client.ipc import ZoeIPCClient
from common.configuration import zoeconf
from zoe_client.entities import User, Execution, Application
from zoe_client.entities import Execution, Application, User
from zoe_client.state.user import UserState
log = logging.getLogger(__name__)
MASTER_IMAGE = "/zoerepo/spark-master"
WORKER_IMAGE = "/zoerepo/spark-worker"
SUBMIT_IMAGE = "/zoerepo/spark-submit"
NOTEBOOK_IMAGE = "/zoerepo/spark-notebook"
class ZoeClient:
def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.image_registry = zoeconf().docker_private_registry
self.state = AlchemySession()
# Applications
def application_get(self, application_id: int) -> Application:
answer = self.ipc_server.ask('application_get', application_id=application_id)
if answer is not None:
return Application(answer['app'])
def application_binary_put(self, application_id: int, app_data: bytes) -> bool:
file_data = base64.b64encode(app_data)
answer = self.ipc_server.ask('application_binary_put', application_id=application_id, bin_data=file_data)
return answer is not None
def application_get_binary(self, application_id: int) -> bytes:
data = self.ipc_server.ask('application_get_binary', application_id=application_id)
def application_binary_get(self, application_id: int) -> bytes:
data = self.ipc_server.ask('application_binary_get', application_id=application_id)
app_data = base64.b64decode(data['zip_data'])
return app_data
......@@ -41,49 +41,27 @@ class ZoeClient:
else:
return [Application(x) for x in answer['apps']]
def application_new(self, user_id: int, description: dict) -> int:
if not self.user_check(user_id):
return None
answer = self.ipc_server.ask('application_new', user_id=user_id, description=description)
if answer is not None:
return answer['application_id']
def application_remove(self, application_id: int, force: bool) -> bool:
answer = self.ipc_server.ask('application_remove', application_id=application_id, force=force)
return answer is not None
def application_spark_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
answer = self.ipc_server.ask('application_spark_new',
user_id=user_id,
worker_count=worker_count,
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
master_image=self.image_registry + MASTER_IMAGE,
worker_image=self.image_registry + WORKER_IMAGE)
if answer is not None:
return answer['app_id']
def application_spark_notebook_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
answer = self.ipc_server.ask('application_spark_notebook_new',
user_id=user_id,
worker_count=worker_count,
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,