Commit 4cb2592c authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Fix most of the web interface after refactoring everything else

Creating new applications via the web interface is still broken
parent a30a95a0
......@@ -59,5 +59,5 @@ docs/_build/
# PyBuilder
target/
.idea/
zoe-scheduler.conf
zoe.conf
zoe-client.conf
......@@ -51,6 +51,15 @@ class ZoeApplication:
for p in data['processes']:
ret.processes.append(ZoeApplicationProcess.from_dict(p))
found_monitor = False
for p in ret.processes:
if p.monitor:
found_monitor = True
break
if not found_monitor:
raise InvalidApplicationDescription(msg="at least one process should have monitor set to True")
return ret
def to_dict(self) -> dict:
......@@ -59,6 +68,7 @@ class ZoeApplication:
'version': self.version,
'will_end': self.will_end,
'priority': self.priority,
'requires_binary': self.requires_binary,
'processes': []
}
for p in self.processes:
......@@ -71,6 +81,55 @@ class ZoeApplication:
memory += p.required_resources['memory']
return memory
def container_count(self) -> int:
return len(self.processes)
class ZoeProcessEndpoint:
def __init__(self):
self.name = ''
self.protocol = ''
self.port_number = 0
self.path = ''
self.is_main_endpoint = False
def to_dict(self) -> dict:
return {
'name': self.name,
'protocol': self.protocol,
'port_number': self.port_number,
'path': self.path,
'is_main_endpoint': self.is_main_endpoint
}
@classmethod
def from_dict(cls, data):
ret = cls()
required_keys = ['name', 'protocol', 'port_number', 'is_main_endpoint']
for k in required_keys:
try:
setattr(ret, k, data[k])
except KeyError:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
ret.port_number = int(ret.port_number)
except ValueError:
raise InvalidApplicationDescription(msg="port_number field should be an integer")
try:
ret.is_main_endpoint = bool(ret.is_main_endpoint)
except ValueError:
raise InvalidApplicationDescription(msg="is_main_endpoint field should be a boolean")
if 'path' in data:
ret.path = data['path']
return ret
def get_url(self, address):
return self.protocol + "://" + address + ":{}".format(self.port_number) + self.path
class ZoeApplicationProcess:
def __init__(self):
......@@ -78,7 +137,7 @@ class ZoeApplicationProcess:
self.version = 0
self.docker_image = ''
self.monitor = False # if this process dies, the whole application is considered as complete and the execution is terminated
self.ports = []
self.ports = [] # A list of ZoeProcessEndpoint
self.required_resources = {}
self.environment = [] # Environment variables to pass to Docker
self.command = None # Commandline to pass to the Docker container
......@@ -89,9 +148,10 @@ class ZoeApplicationProcess:
'version': self.version,
'docker_image': self.docker_image,
'monitor': self.monitor,
'ports': self.ports.copy(),
'ports': [p.to_dict() for p in self.ports],
'required_resources': self.required_resources.copy(),
'environment': self.environment.copy()
'environment': self.environment.copy(),
'command': self.command
}
return ret
......@@ -122,9 +182,7 @@ class ZoeApplicationProcess:
if not hasattr(data['ports'], '__iter__'):
raise InvalidApplicationDescription(msg='ports should be an iterable')
for pp in data['ports']:
if len(pp) != 3:
raise InvalidApplicationDescription(msg="ports entries must contain exactly three elements (port_number, name, is_main_port)")
ret.ports = data['ports'].copy()
ret.ports.append(ZoeProcessEndpoint.from_dict(pp))
if 'required_resources' not in data:
raise InvalidApplicationDescription(msg="Missing required key: required_resources")
......@@ -152,3 +210,10 @@ class ZoeApplicationProcess:
if 'command' in data:
ret.command = data['command']
return ret
def exposed_endpoint(self) -> ZoeProcessEndpoint:
for p in self.ports:
assert isinstance(p, ZoeProcessEndpoint)
if p.is_main_endpoint:
return p
return None
......@@ -114,10 +114,6 @@ class ZoeConfig(ConfigParser):
def ipc_listen_address(self) -> str:
return self.get('zoe_scheduler', 'ipc_listen_address')
@property
def object_storage_url(self) -> str:
return self.get('zoe_scheduler', 'object_storage_url')
@property
def storage_path(self) -> str:
return self.get('zoe_storage', 'storage_path')
......
......@@ -10,11 +10,11 @@ from common.configuration import zoe_conf
log = logging.getLogger(__name__)
def generate_application_binary_url(application_id: int) -> str:
return zoe_conf().object_storage_url + '/apps/{}'.format(application_id)
def generate_storage_url(obj_id: int, kind: str) -> str:
return zoe_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
def _upload(obj_id, kind, data: bytes):
def put(obj_id, kind, data: bytes):
url = zoe_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
files = {'file': data}
try:
......@@ -23,7 +23,7 @@ def _upload(obj_id, kind, data: bytes):
log.error("Cannot connect to {} to POST the binary file".format(url))
def _download(obj_id, kind) -> bytes:
def get(obj_id, kind) -> bytes:
url = zoe_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
try:
r = requests.get(url)
......@@ -34,28 +34,22 @@ def _download(obj_id, kind) -> bytes:
return r.content
def _delete(obj_id, kind):
def check(obj_id, kind) -> bool:
url = zoe_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
try:
requests.delete(url)
r = requests.head(url)
except requests.exceptions.ConnectionError:
log.error("Cannot connect to {} to DELETE the binary file".format(url))
def upload_application(app_id, app_data: bytes):
_upload(app_id, "apps", app_data)
def download_application(application_id) -> bytes:
return _download(application_id, "apps")
def download_log_url(execution_id) -> bytes:
return zoe_conf().object_storage_url + '/logs/{}'.format(execution_id)
return False
else:
return r.status_code == 200
def delete_application(application_id):
_delete(application_id, "apps")
def delete(obj_id, kind):
url = zoe_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
try:
requests.delete(url)
except requests.exceptions.ConnectionError:
log.error("Cannot connect to {} to DELETE the binary file".format(url))
def logs_archive_create(execution_id: int, logs: list):
......@@ -64,4 +58,4 @@ def logs_archive_create(execution_id: int, logs: list):
for c in logs:
fname = c[0] + "-" + c[1] + ".txt"
logzip.writestr(fname, c[2])
_upload(execution_id, "logs", zipdata)
put(execution_id, "logs", zipdata.getvalue())
......@@ -14,11 +14,13 @@
"memory": 2147483648
},
"ports": [
[
"8080",
"Spark master web interface",
false
]
{
"name": "Spark master web interface",
"protocol": "http",
"port_number": 8080,
"path": "/",
"is_main_endpoint": false
}
]
},
{
......@@ -30,11 +32,13 @@
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
{
"name": "Spark worker web interface",
"protocol": "http",
"port_number": 8081,
"path": "/",
"is_main_endpoint": false
}
],
"environment": [
["SPARK_WORKER_CORES", "4"],
......@@ -51,11 +55,13 @@
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
{
"name": "Spark worker web interface",
"protocol": "http",
"port_number": 8081,
"path": "/",
"is_main_endpoint": false
}
],
"environment": [
["SPARK_WORKER_CORES", "4"],
......@@ -72,16 +78,20 @@
"memory": 4294967296
},
"ports": [
[
"4040",
"Spark application web interface",
false
],
[
"9000",
"Spark Notebook interface",
true
]
{
"name": "Spark application web interface",
"protocol": "http",
"port_number": 4040,
"path": "/",
"is_main_endpoint": false
},
{
"name": "Spark Notebook interface",
"protocol": "http",
"port_number": 9000,
"path": "/",
"is_main_endpoint": true
}
],
"environment": [
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"],
......
......@@ -14,11 +14,13 @@
"memory": 2147483648
},
"ports": [
[
"8080",
"Spark master web interface",
false
]
{
"name": "Spark master web interface",
"protocol": "http",
"port_number": 8080,
"path": "/",
"is_main_endpoint": false
}
]
},
{
......@@ -30,11 +32,13 @@
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
{
"name": "Spark worker web interface",
"protocol": "http",
"port_number": 8081,
"path": "/",
"is_main_endpoint": false
}
],
"environment": [
["SPARK_WORKER_CORES", "4"],
......@@ -51,11 +55,13 @@
"memory": 4294967296
},
"ports": [
[
"8081",
"Spark worker web interface",
false
]
{
"name": "Spark worker web interface",
"protocol": "http",
"port_number": 8081,
"path": "/",
"is_main_endpoint": false
}
],
"environment": [
["SPARK_WORKER_CORES", "4"],
......@@ -72,11 +78,13 @@
"memory": 4294967296
},
"ports": [
[
"4040",
"Spark application web interface",
true
]
{
"name": "Spark application web interface",
"protocol": "http",
"port_number": 4040,
"path": "/",
"is_main_endpoint": true
}
],
"environment": [
["SPARK_MASTER_IP", "{cluster[spark-master][ip_address]}"],
......
[zoe_client]
db_connect = mysql+mysqlconnector://root@localhost/zoe
scheduler_ipc_address = localhost
scheduler_ipc_port = 8723
object_storage_url = http://localhost:4390
[zoe_web]
smtp_password = Daicu2Ze
smtp_user = bigfoot.data@gmail.com
smtp_server = smtp.gmail.com
cookie_secret = \xc3\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!
web_server_name = 192.168.45.25
[zoe_storage]
storage_path = /var/lib/zoe
http_listen_address = 192.168.45.25
http_listen_port = 4390
from zoe_client.applications import ZoeClient
......@@ -6,8 +6,11 @@ from zoe_client.lib.ipc import ZoeIPCClient
from zoe_client.scheduler_classes.execution import Execution
from zoe_client.state import session
from zoe_client.state.application import ApplicationState
from zoe_client.users import user_check
from zoe_client.executions import execution_delete
from common.application_description import ZoeApplication
from common.exceptions import InvalidApplicationDescription
import common.zoe_storage_client as storage
log = logging.getLogger(__name__)
......@@ -21,7 +24,7 @@ def _check_application(state, application_id: int):
def application_binary_get(application_id: int) -> bytes:
state = session()
if _check_application(state, application_id):
return storage.download_application(application_id)
return storage.get(application_id, "apps")
else:
return None
......@@ -29,7 +32,7 @@ def application_binary_get(application_id: int) -> bytes:
def application_binary_put(application_id: int, bin_data: bytes):
state = session()
if _check_application(state, application_id):
storage.upload_application(application_id, bin_data)
storage.put(application_id, "apps", bin_data)
else:
log.error("Trying to upload application data for non-existent application")
......@@ -58,19 +61,25 @@ def application_list(user_id: int) -> list:
:param user_id: the user
:returns a list of ApplicationState objects
"""
if not user_check(user_id):
log.error("no such user")
return None
state = session()
return state.query(ApplicationState).filter_by(user_id=user_id).all()
def application_new(user_id: int, description: dict) -> ApplicationState:
ipc_client = ZoeIPCClient()
answer = ipc_client.ask('application_validate', description=description)
if answer is None:
log.error("Application description failed the scheduler validation")
if not user_check(user_id):
log.error("no such user")
return None
try:
app = ZoeApplication.from_dict(description)
except InvalidApplicationDescription as e:
log.error("invalid application description: %s" % e.value)
return None
state = session()
application = ApplicationState(user_id=user_id, description=description)
application = ApplicationState(user_id=user_id, description=app)
state.add(application)
state.commit()
return application
......@@ -91,12 +100,6 @@ def application_remove(application_id: int):
execution_delete(e.id)
application = state.query(ApplicationState).filter_by(id=application_id).one()
storage.delete_application(application_id)
storage.delete(application_id, "apps")
state.delete(application)
state.commit()
def application_validate(description: dict) -> bool:
ipc_client = ZoeIPCClient()
answer = ipc_client.ask('application_validate', description=description)
return answer is not None
from zoe_client.lib.ipc import ZoeIPCClient
from zoe_client.scheduler_classes.execution import Execution
from zoe_client.scheduler_classes.container import Container
# Logs
......@@ -20,3 +22,11 @@ def platform_stats() -> dict:
def container_stats(container_id):
ipc_client = ZoeIPCClient()
return ipc_client.ask('container_stats', container_id=container_id)
def execution_exposed_url(execution: Execution):
for c in execution.containers:
assert isinstance(c, Container)
port = c.description.exposed_endpoint()
if port is not None:
return port.get_url(c.ip_address)
......@@ -5,19 +5,18 @@ from zipfile import is_zipfile
from pprint import pprint
import sys
from zoe_client import ZoeClient
from common.configuration import conf_init, client_conf
from common.configuration import conf_init, zoe_conf
import zoe_client.applications as apps
import zoe_client.diagnostics as diags
import zoe_client.executions as execs
from zoe_client.state import init as state_init, create_tables
from zoe_client.state.application import ApplicationState
import zoe_client.users as users
def get_zoe_client() -> ZoeClient:
return ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
def stats_cmd(_):
client = get_zoe_client()
stats = client.platform_stats()
stats = diags.platform_stats()
pprint(stats)
......@@ -32,25 +31,23 @@ def user_get_cmd(args):
def app_new_cmd(args):
client = get_zoe_client()
app_descr = json.load(args.jsonfile)
application = client.application_new(args.user_id, app_descr)
print("Application added with ID: {}".format(application.id))
application = apps.application_new(args.user_id, app_descr)
if application is not None:
print("Application added with ID: {}".format(application.id))
def app_bin_put_cmd(args):
client = get_zoe_client()
if not is_zipfile(args.zipfile):
print("Error: application binary must be a zip file")
return
args.zipfile.seek(0)
zipdata = args.zipfile.read()
client.application_binary_put(args.app_id, zipdata)
apps.application_binary_put(args.app_id, zipdata)
def app_start_cmd(args):
client = get_zoe_client()
ret = client.execution_start(args.id)
ret = execs.execution_start(args.id)
if ret:
print("Application scheduled successfully, use the app-inspect command to check its status")
else:
......@@ -58,18 +55,16 @@ def app_start_cmd(args):
def app_rm_cmd(args):
client = get_zoe_client()
client.application_remove(args.id)
apps.application_remove(args.id)
def app_inspect_cmd(args):
client = get_zoe_client()
application = client.application_get(args.id)
application = apps.application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
print("Application name: {}".format(application.description["name"]))
executions = client.application_executions_get(application_id=args.id)
executions = apps.application_executions_get(application_id=args.id)
for e in executions:
print(" - Execution {} (ID: {}) {}".format(e.name, e.id, e.status))
for c in e.containers:
......@@ -77,38 +72,35 @@ def app_inspect_cmd(args):
def app_list_cmd(args):
client = get_zoe_client()
applications = client.application_list(args.id)
applications = apps.application_list(args.id)
if len(applications) > 0:
print("{:4} {:20}".format("ID", "Name"))
for app in applications:
print("{:4} {:20}".format(app.id, app.description['name']))
assert isinstance(app, ApplicationState)
print("{:4} {:20}".format(app.id, app.description.name))
def exec_kill_cmd(args):
client = get_zoe_client()
execution = client.execution_get(args.id)
execution = execs.execution_get(args.id)
if execution is None:
print("Error: execution {} does not exist".format(args.id))
return
client.execution_kill(execution.id)
execs.execution_kill(execution.id)
def log_get_cmd(args):
client = get_zoe_client()
log = client.log_get(args.id)
log = diags.log_get(args.id)
if log is None:
print("Error: No log found for container ID {}".format(args.id))
print(log)
def gen_config_cmd(args):
client_conf().write(open(args.output_file, "w"))
zoe_conf().write(open(args.output_file, "w"))
def container_stats_cmd(args):
client = get_zoe_client()
stats = client.container_stats(args.container_id)
stats = diags.container_stats(args.container_id)
print(stats)
......@@ -183,14 +175,15 @@ def zoe():
logging.basicConfig(level=logging.DEBUG)
else:
logging.