Commit 9f010958 authored by Daniele Venzano's avatar Daniele Venzano

Decouple more methods so the client does not access the database directly

parent dc68cf0d
......@@ -15,23 +15,15 @@ class ContainerState(Base):
proxies = relationship("ProxyState", order_by="ProxyState.id", backref="container")
def extract(self):
"""
Generates a normal object not attached to SQLAlchemy
:rtype : Container
"""
return Container(self)
class Container:
def __init__(self, container: ContainerState):
self.id = container.id
self.docker_id = container.docker_id
self.cluster_id = container.cluster_id
self.ip_address = container.ip_address
self.readable_name = container.readable_name
self.proxies = []
for p in container.proxies:
self.proxies.append(p.extract())
def to_dict(self) -> dict:
ret = {
'id': self.id,
'docker_id': self.docker_id,
'cluster_id': self.cluster_id,
'ip_address': self.ip_address,
'readable_name': self.readable_name,
'proxies': []
}
for p in self.proxies:
ret['proxies'].append(p.to_dict)
......@@ -49,8 +49,28 @@ class ExecutionState(Base):
if c.readable_name == name:
return c
def extract(self):
return Execution(self)
def to_dict(self) -> dict:
ret = {
'id': self.id,
'name': self.name,
'application_id': self.application_id,
'time_scheduled': self.time_scheduled,
'time_started': self.time_started,
'time_finished': self.time_finished,
'status': self.status,
'termination_notice': self.termination_notice,
'type': self.type,
'assigned_resources': self.assigned_resources.to_dict()
}
if self.cluster is not None:
ret['cluster_id'] = self.cluster.id
ret['containers'] = [c.to_dict() for c in self.cluster.containers]
else:
ret['cluster_id'] = None
ret['containers'] = []
return ret
class SparkSubmitExecutionState(ExecutionState):
......@@ -61,33 +81,8 @@ class SparkSubmitExecutionState(ExecutionState):
'polymorphic_identity': 'spark-submit-application'
}
def extract(self):
return Execution(self)
class Execution:
def __init__(self, execution: ExecutionState):
self.id = execution.id
self.name = execution.name
self.assigned_resources = execution.assigned_resources
self.application_id = execution.application_id
self.time_started = execution.time_started
self.time_scheduled = execution.time_scheduled
self.time_finished = execution.time_finished
self.status = execution.status
self.termination_notice = execution.termination_notice
if execution.cluster is not None:
self.cluster_id = execution.cluster.id
else:
self.cluster_id = None
self.type = execution.type
if isinstance(execution, SparkSubmitExecutionState):
self.commandline = execution.commandline
self.spark_opts = execution.spark_opts
self.containers = []
if execution.cluster is not None:
for c in execution.cluster.containers:
self.containers.append(c.extract())
def to_dict(self) -> dict:
ret = super().to_dict()
ret['commandline'] = self.commandline
ret['spark_opts'] = self.spark_opts
return ret
......@@ -13,15 +13,13 @@ class ProxyState(Base):
service_name = Column(String(32))
last_access = Column(DateTime, default=func.now())
def extract(self):
return Proxy(self)
class Proxy:
def __init__(self, proxy: ProxyState):
self.id = proxy.id
self.internal_url = proxy.internal_url
self.cluster_id = proxy.cluster_id
self.container_id = proxy.container_id
self.service_name = proxy.service_name
self.last_access = proxy.last_access
def to_dict(self) -> dict:
ret = {
'id': self.id,
'internal_url': self.internal_url,
'cluster_id': self.cluster_id,
'container_id': self.container_id,
'service_name': self.service_name,
'last_access': self.last_access
}
return ret
......@@ -5,15 +5,13 @@ from sqlalchemy.orm.exc import NoResultFound
from zoe_client.ipc import ZoeIPCClient
from common.state import AlchemySession
from common.state.application import ApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkApplicationState, Application
from common.state.execution import ExecutionState, Execution
from common.state.proxy import ProxyState
from common.state.application import ApplicationState, Application
from common.state.execution import ExecutionState
from common.state.user import UserState
from common.application_resources import SparkApplicationResources
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
import common.object_storage as storage
from common.configuration import zoeconf
from zoe_client.entities import User
from zoe_client.entities import User, Execution
log = logging.getLogger(__name__)
......@@ -75,69 +73,44 @@ class ZoeClient:
self.state.commit()
def application_spark_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 1
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkApplicationState(master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.commit()
return app.id
answer = self.ipc_server.ask('application_spark_new',
user_id=user_id,
worker_count=worker_count,
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE)
if answer is not None:
return answer['app_id']
def application_spark_notebook_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 2
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkNotebookApplicationState(master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
notebook_image=NOTEBOOK_IMAGE,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.commit()
return app.id
answer = self.ipc_server.ask('application_spark_notebook_new',
user_id=user_id,
worker_count=worker_count,
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
notebook_image=NOTEBOOK_IMAGE)
if answer is not None:
return answer['app_id']
def application_spark_submit_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file_data: bytes) -> int:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 2
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkSubmitApplicationState(master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
submit_image=SUBMIT_IMAGE,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.flush()
storage.application_data_upload(app, file_data)
self.state.commit()
return app.id
file_data = base64.b64encode(file_data).decode('ascii')
answer = self.ipc_server.ask('application_spark_submit_new',
user_id=user_id,
worker_count=worker_count,
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
file_data=file_data,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
submit_image=SUBMIT_IMAGE)
if answer is not None:
return answer['app_id']
# Containers
def container_stats(self, container_id):
......@@ -145,40 +118,18 @@ class ZoeClient:
# Executions
def execution_delete(self, execution_id: int) -> None:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return
if execution.status == "running":
raise ApplicationStillRunning(execution.application)
storage.logs_archive_delete(execution)
self.state.delete(execution)
ret = self.ipc_server.ask('execution_delete', execution_id=execution_id)
return ret is not None
def execution_get(self, execution_id: int) -> Execution:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return None
return execution.extract()
exec_dict = self.ipc_server.ask('execution_get', execution_id=execution_id)
if exec_dict is not None:
return Execution(exec_dict)
def execution_get_proxy_path(self, execution_id):
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return None
if execution is None:
return None
if isinstance(execution.application, SparkNotebookApplicationState):
c = execution.find_container("spark-notebook")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
return zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
elif isinstance(execution.application, SparkSubmitApplicationState):
c = execution.find_container("spark-submit")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark application web interface").one()
return zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
else:
return None
answer = self.ipc_server.ask('execution_get_proxy_path', execution_id=execution_id)
if answer is not None:
return answer['path']
def execution_spark_new(self, application_id: int, name, commandline=None, spark_options=None) -> bool:
ret = self.ipc_server.ask('execution_spark_new', application_id=application_id, name=name, commandline=commandline, spark_options=spark_options)
......
......@@ -2,3 +2,51 @@ class User:
def __init__(self, user: dict):
self.id = user['id']
self.email = user['email']
class Execution:
def __init__(self, execution: dict):
self.id = execution['id']
self.name = execution['name']
self.assigned_resources = execution['assigned_resources']
self.application_id = execution['application_id']
self.time_started = execution['time_started']
self.time_scheduled = execution['time_scheduled']
self.time_finished = execution['time_finished']
self.status = execution['status']
self.termination_notice = execution['termination_notice']
self.cluster_id = execution['cluster_id']
self.type = execution['type']
if self.type == 'spark-submit-application':
self.commandline = execution['commandline']
self.spark_opts = execution['spark_opts']
self.containers = []
for c in execution['containers']:
self.containers.append(Container(c))
class Container:
def __init__(self, container: dict):
self.id = container['id']
self.docker_id = container['docker_id']
self.cluster_id = container['cluster_id']
self.ip_address = container['ip_address']
self.readable_name = container['readable_name']
self.proxies = []
for p in container['proxies']:
self.proxies.append(Proxy(p))
class Proxy:
def __init__(self, proxy: dict):
self.id = proxy['id']
self.internal_url = proxy['internal_url']
self.cluster_id = proxy['cluster_id']
self.container_id = proxy['container_id']
self.service_name = proxy['service_name']
self.last_access = proxy['last_access']
......@@ -5,12 +5,15 @@ import threading
from sqlalchemy.orm.exc import NoResultFound
import zmq
from common.application_resources import SparkApplicationResources
from common.state import AlchemySession
from common.state.application import ApplicationState, SparkSubmitApplicationState
from common.state.application import ApplicationState, SparkSubmitApplicationState, SparkNotebookApplicationState, SparkApplicationState
from common.state.container import ContainerState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
from common.state.proxy import ProxyState
from common.state.user import UserState
import common.object_storage as storage
from common.configuration import zoeconf
from zoe_scheduler.scheduler import ZoeScheduler
......@@ -66,12 +69,122 @@ class ZoeIPCServer:
return {'status': 'error', 'answer': error_msg}
# ############# Exposed methods below ################
# Applications
def application_spark_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str,
master_image: str, worker_image: str) -> int:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return self._reply_error('no such user')
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 1
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkApplicationState(master_image=master_image,
worker_image=worker_image,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.commit()
return self._reply_ok(app_id=app.id)
def application_spark_notebook_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str,
master_image: str, worker_image: str, notebook_image: str) -> int:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return self._reply_error('no such user')
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 2
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkNotebookApplicationState(master_image=master_image,
worker_image=worker_image,
notebook_image=notebook_image,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.commit()
return self._reply_ok(app_id=app.id)
def application_spark_submit_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str, file_data: bytes,
master_image: str, worker_image: str, submit_image: str) -> dict:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return self._reply_error('no such user')
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.container_count = worker_count + 2
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["cores"] = executor_cores
app = SparkSubmitApplicationState(master_image=master_image,
worker_image=worker_image,
submit_image=submit_image,
name=name,
required_resources=resources,
user_id=user_id)
self.state.add(app)
self.state.flush()
storage.application_data_upload(app, file_data)
self.state.commit()
return self._reply_ok(app_id=app.id)
# Containers
def container_stats(self, container_id: int) -> dict:
ret = self.sched.platform.container_stats(container_id).to_dict()
return self._reply_ok(**ret)
# Executions
def execution_delete(self, execution_id: int) -> dict:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return self._reply_error('no such execution')
if execution.status == "running":
self.sched.execution_terminate(self.state, execution)
# FIXME remove it also from the scheduler, check for scheduled state
storage.logs_archive_delete(execution)
self.state.delete(execution)
self.state.commit()
return self._reply_ok()
def execution_get(self, execution_id: int) -> dict:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return self._reply_error('no such execution')
return self._reply_ok(**execution.to_dict())
def execution_get_proxy_path(self, execution_id: int) -> dict:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return self._reply_error('no such execution')
if isinstance(execution.application, SparkNotebookApplicationState):
c = execution.find_container("spark-notebook")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
path = zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
return self._reply_ok(path=path)
elif isinstance(execution.application, SparkSubmitApplicationState):
c = execution.find_container("spark-submit")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark application web interface").one()
path = zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
return self._reply_ok(path=path)
else:
return self._reply_error('unknown application type')
def execution_spark_new(self, application_id: int, name: str, commandline=None, spark_options=None) -> dict:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
......@@ -100,14 +213,12 @@ class ZoeIPCServer:
self._reply_error('admission control refused this application execution')
self.state.rollback()
return self._reply_ok()
return self._reply_ok(execution_id=execution.id)
def execution_terminate(self, execution_id: int) -> dict:
state = AlchemySession()
execution = state.query(ExecutionState).filter_by(id=execution_id).one()
self.sched.execution_terminate(state, execution)
state.commit()
state.close()
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
self.sched.execution_terminate(self.state, execution)
self.state.commit()
return self._reply_ok()
# Logs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment