Commit 9b1dcd74 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Finish moving all client methods to IPC calls

parent 9f010958
......@@ -2,6 +2,9 @@ class ApplicationResources:
def core_count(self):
return 0
def to_dict(self) -> dict:
return {}
# For now resources are dictionaries and Platform recognizes:
# - memory_limit
......@@ -20,3 +23,13 @@ class SparkApplicationResources(ApplicationResources):
return self.worker_count * self.worker_resources["cores"]
else:
return 0
def to_dict(self) -> dict:
ret = super().to_dict()
ret['master_resources'] = self.master_resources
ret['worker_resources'] = self.worker_resources
ret['notebook_resources'] = self.notebook_resources
ret['client_resources'] = self.client_resources
ret['worker_count'] = self.worker_count
ret['container_count'] = self.container_count
return ret
......@@ -28,8 +28,16 @@ class ApplicationState(Base):
ret.append(e)
return ret
def extract(self):
return Application(self)
def to_dict(self) -> dict:
ret = {
'id': self.id,
'name': self.name,
'user_id': self.user_id,
'type': self.type,
'required_resources': self.required_resources.to_dict(),
'executions': [e.to_dict() for e in self.executions]
}
return ret
class SparkApplicationState(ApplicationState):
......@@ -40,8 +48,11 @@ class SparkApplicationState(ApplicationState):
'polymorphic_identity': 'spark-application'
}
def extract(self):
return Application(self)
def to_dict(self) -> dict:
ret = super().to_dict()
ret['master_image'] = self.master_image
ret['worker_image'] = self.worker_image
return ret
class SparkNotebookApplicationState(SparkApplicationState):
......@@ -51,8 +62,10 @@ class SparkNotebookApplicationState(SparkApplicationState):
'polymorphic_identity': 'spark-notebook'
}
def extract(self):
return Application(self)
def to_dict(self) -> dict:
ret = super().to_dict()
ret['notebook_image'] = self.notebook_image
return ret
class SparkSubmitApplicationState(SparkApplicationState):
......@@ -62,45 +75,7 @@ class SparkSubmitApplicationState(SparkApplicationState):
'polymorphic_identity': 'spark-submit'
}
def extract(self):
return Application(self)
class Application:
"""
:type id: int
:type name: str
:type required_resources: ApplicationResources
:type user_id: int
:type type: str
:type master_image: str
:type worker_image: str
:type notebook_image: str
:type submit_image: str
:type executions: list[Execution]
"""
def __init__(self, application: ApplicationState) -> None:
self.id = application.id
self.name = application.name
self.required_resources = application.required_resources
self.user_id = application.user_id
self.type = application.type
if isinstance(application, SparkApplicationState):
self.master_image = application.master_image
self.worker_image = application.worker_image
if isinstance(application, SparkNotebookApplicationState):
self.notebook_image = application.notebook_image
if isinstance(application, SparkSubmitApplicationState):
self.submit_image = application.submit_image
self.executions = []
for e in application.executions:
self.executions.append(e.extract())
def __str__(self):
s = "Application"
s += " - Name: {}".format(self.name)
s += " - Type: {}".format(self.type)
# FIXME add missing fields
return s
def to_dict(self) -> dict:
ret = super().to_dict()
ret['submit_image'] = self.submit_image
return ret
import base64
import logging
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.ipc import ZoeIPCClient
from common.state import AlchemySession
from common.state.application import ApplicationState, Application
from common.state.execution import ExecutionState
from common.state.user import UserState
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
import common.object_storage as storage
from common.configuration import zoeconf
from zoe_client.entities import User, Execution
from zoe_client.entities import User, Execution, Application
log = logging.getLogger(__name__)
......@@ -25,22 +17,17 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoerepo/spark-notebook"
class ZoeClient:
def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.state = AlchemySession()
# Applications
def application_get(self, application_id: int) -> Application:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return None
return application.extract()
answer = self.ipc_server.ask('application_get', application_id=application_id)
if answer is not None:
return Application(answer['app'])
def application_get_binary(self, application_id: int) -> bytes:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return None
return storage.application_data_download(application)
data = self.ipc_server.ask('application_get_binary', application_id=application_id)
app_data = base64.b64decode(data['zip_data'])
return app_data
def application_list(self, user_id):
"""
......@@ -48,29 +35,15 @@ class ZoeClient:
:type user_id: int
:rtype : list[Application]
"""
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
apps = self.state.query(ApplicationState).filter_by(user_id=user_id).all()
return [x.extract() for x in apps]
def application_remove(self, application_id: int):
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return
running = self.state.query(ExecutionState).filter_by(application_id=application.id, time_finished=None).count()
if running > 0:
raise ApplicationStillRunning(application)
storage.application_data_delete(application)
for e in application.executions:
self.execution_delete(e.id)
self.state.delete(application)
self.state.commit()
answer = self.ipc_server.ask('application_list', user_id=user_id)
if answer is None:
return []
else:
return [Application(x) for x in answer['apps']]
def application_remove(self, application_id: int, force: bool) -> bool:
answer = self.ipc_server.ask('application_remove', application_id=application_id, force=force)
return answer is not None
def application_spark_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str) -> int:
answer = self.ipc_server.ask('application_spark_new',
......
......@@ -50,3 +50,36 @@ class Proxy:
self.container_id = proxy['container_id']
self.service_name = proxy['service_name']
self.last_access = proxy['last_access']
class Application:
"""
:type id: int
:type name: str
:type required_resources: ApplicationResources
:type user_id: int
:type type: str
:type master_image: str
:type worker_image: str
:type notebook_image: str
:type submit_image: str
:type executions: list[Execution]
"""
def __init__(self, application: dict):
self.id = application['id']
self.name = application['name']
self.required_resources = application['required_resources']
self.user_id = application['user_id']
self.type = application['type']
if 'spark' in self.type:
self.master_image = application['master_image']
self.worker_image = application['worker_image']
if self.type == 'spark-notebook':
self.notebook_image = application['notebook_image']
if self.type == 'spark-submit':
self.submit_image = application['submit_image']
self.executions = []
for e in application['executions']:
self.executions.append(Execution(e))
......@@ -70,8 +70,51 @@ class ZoeIPCServer:
# ############# Exposed methods below ################
# Applications
def application_get(self, application_id: int) -> dict:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return self._reply_error('no such application')
return self._reply_ok(app=application.to_dict())
def application_get_binary(self, application_id: int) -> dict:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return self._reply_error('no such application')
else:
app_data = storage.application_data_download(application)
app_data = base64.b64encode(app_data)
return self._reply_ok(zip_data=app_data.decode('ascii'))
def application_list(self, user_id: int) -> dict:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return self._reply_error('no such user')
apps = self.state.query(ApplicationState).filter_by(user_id=user_id).all()
return self._reply_ok(apps=[x.to_dict() for x in apps])
def application_remove(self, application_id: int, force=False) -> dict:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return self._reply_error('no such application')
running = self.state.query(ExecutionState).filter_by(application_id=application.id, time_finished=None).all()
if not force and len(running) > 0:
return self._reply_error('there are active execution, cannot delete')
storage.application_data_delete(application)
for e in application.executions:
self.execution_delete(e.id)
self.state.delete(application)
self.state.commit()
return self._reply_ok()
def application_spark_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str,
master_image: str, worker_image: str) -> int:
master_image: str, worker_image: str) -> dict:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
......@@ -92,7 +135,7 @@ class ZoeIPCServer:
return self._reply_ok(app_id=app.id)
def application_spark_notebook_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int, name: str,
master_image: str, worker_image: str, notebook_image: str) -> int:
master_image: str, worker_image: str, notebook_image: str) -> dict:
try:
self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment