Commit dc68cf0d authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Remove rpyc and convert more client methods to new rpc

parent ba2bb2ba
from configparser import ConfigParser from configparser import ConfigParser
rpycconf = { ipcconf = {
'client_rpyc_autodiscovery': True, 'server': None,
'client_rpyc_server': None, 'port': None,
'client_rpyc_port': None,
} }
config_paths = [ config_paths = [
......
...@@ -3,41 +3,28 @@ ...@@ -3,41 +3,28 @@
import argparse import argparse
import logging import logging
from rpyc.utils.server import ThreadedServer
from zoe_scheduler.rpyc_service import ZoeSchedulerRPCService
from zoe_scheduler.scheduler import zoe_sched from zoe_scheduler.scheduler import zoe_sched
from zoe_scheduler.periodic_tasks import PeriodicTaskManager from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.ipc import ZoeIPCServer from zoe_scheduler.ipc import ZoeIPCServer
from common.object_storage import init_history_paths from common.object_storage import init_history_paths
log = logging.getLogger('zoe') log = logging.getLogger('zoe')
loop = None
rpyc_server = None
def sigint_handler(): def sigint_handler():
log.warning('CTRL-C detected, terminating event loop...') log.warning('CTRL-C detected, terminating event loop...')
loop.stop()
zoe_sched.stop_tasks() zoe_sched.stop_tasks()
rpyc_server.stop()
try:
loop.run_forever()
except RuntimeError:
pass
def process_arguments() -> argparse.Namespace: def process_arguments() -> argparse.Namespace:
argparser = argparse.ArgumentParser(description="Zoe Scheduler - Container Analytics as a Service scheduling component") argparser = argparse.ArgumentParser(description="Zoe Scheduler - Container Analytics as a Service scheduling component")
argparser.add_argument('-d', '--debug', action='store_true', help='Enable debug output') argparser.add_argument('-d', '--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--rpyc-no-auto-register', action='store_true', help='Do not register automatically in the RPyC registry')
argparser.add_argument('--ipc-server-port', type=int, default=8723, help='Port the IPC server should bind to') argparser.add_argument('--ipc-server-port', type=int, default=8723, help='Port the IPC server should bind to')
return argparser.parse_args() return argparser.parse_args()
def main(): def main():
global loop, rpyc_server
args = process_arguments() args = process_arguments()
if args.debug: if args.debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
...@@ -45,8 +32,6 @@ def main(): ...@@ -45,8 +32,6 @@ def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('requests').setLevel(logging.WARNING)
rpyc_logger = logging.getLogger('rpyc')
rpyc_logger.setLevel(logging.WARNING)
ipc_server = ZoeIPCServer(zoe_sched, args.ipc_server_port) ipc_server = ZoeIPCServer(zoe_sched, args.ipc_server_port)
...@@ -55,16 +40,11 @@ def main(): ...@@ -55,16 +40,11 @@ def main():
tm = PeriodicTaskManager() tm = PeriodicTaskManager()
rpyc_server = ThreadedServer(ZoeSchedulerRPCService, '0.0.0.0', port=4000,
auto_register=not args.rpyc_no_auto_register,
protocol_config={"allow_public_attrs": True},
logger=rpyc_logger)
zoe_sched.init_tasks(tm) zoe_sched.init_tasks(tm)
ipc_server.start_loop() ipc_server.start_thread()
rpyc_server.start() zoe_sched.loop()
tm.stop_all() tm.stop_all()
......
...@@ -8,7 +8,7 @@ from tornado.ioloop import IOLoop ...@@ -8,7 +8,7 @@ from tornado.ioloop import IOLoop
from zoe_web import app from zoe_web import app
from common.configuration import rpycconf from common.configuration import ipcconf
log = logging.getLogger("zoe_web") log = logging.getLogger("zoe_web")
...@@ -16,8 +16,8 @@ log = logging.getLogger("zoe_web") ...@@ -16,8 +16,8 @@ log = logging.getLogger("zoe_web")
def process_arguments() -> argparse.Namespace: def process_arguments() -> argparse.Namespace:
argparser = argparse.ArgumentParser(description="Zoe Web - Container Analytics as a Service web client") argparser = argparse.ArgumentParser(description="Zoe Web - Container Analytics as a Service web client")
argparser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output') argparser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output')
argparser.add_argument('--rpyc-server', default=None, help='Specify an RPyC server instead of using autodiscovery') argparser.add_argument('--ipc-server', default='localhost', help='Address of the Zoe scheduler process')
argparser.add_argument('--rpyc-port', default=4000, type=int, help='Specify an RPyC server port, default is 4000') argparser.add_argument('--ipc-port', default=8723, type=int, help='Port of the Zoe scheduler process')
return argparser.parse_args() return argparser.parse_args()
...@@ -31,12 +31,8 @@ def main(): ...@@ -31,12 +31,8 @@ def main():
logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("tornado").setLevel(logging.WARNING) logging.getLogger("tornado").setLevel(logging.WARNING)
if args.rpyc_server is None: ipcconf['server'] = args.ipc_server
rpycconf['client_rpyc_autodiscovery'] = True ipcconf['port'] = args.ipc_port
else:
rpycconf['client_rpyc_autodiscovery'] = False
rpycconf['client_rpyc_server'] = args.rpyc_server
rpycconf['client_rpyc_port'] = args.rpyc_port
log.info("Starting HTTP server...") log.info("Starting HTTP server...")
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
......
from zoe_client.client import ZoeClient, get_zoe_client from zoe_client.client import ZoeClient
import base64
import logging import logging
import rpyc
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from zoe_client.ipc import ZoeIPCClient from zoe_client.ipc import ZoeIPCClient
from common.state import AlchemySession from common.state import AlchemySession
from common.state.application import ApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkApplicationState, Application from common.state.application import ApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkApplicationState, Application
from common.state.container import ContainerState from common.state.execution import ExecutionState, Execution
from common.state.execution import ExecutionState, SparkSubmitExecutionState, Execution
from common.state.proxy import ProxyState from common.state.proxy import ProxyState
from common.state.user import UserState from common.state.user import UserState
from common.application_resources import SparkApplicationResources from common.application_resources import SparkApplicationResources
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
import common.object_storage as storage import common.object_storage as storage
from common.configuration import zoeconf, rpycconf from common.configuration import zoeconf
from zoe_client.entities import User from zoe_client.entities import User
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -26,16 +25,9 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoerepo/spark-notebook" ...@@ -26,16 +25,9 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoerepo/spark-notebook"
class ZoeClient: class ZoeClient:
def __init__(self, rpyc_server=None, rpyc_port=4000): def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient("localhost") self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.rpyc_server = rpyc_server
self.rpyc_port = rpyc_port
self.state = AlchemySession() self.state = AlchemySession()
if self.rpyc_server is None:
self.server_connection = rpyc.connect_by_service("ZoeSchedulerRPC")
else:
self.server_connection = rpyc.connect(self.rpyc_server, self.rpyc_port)
self.server = self.server_connection.root
# Applications # Applications
def application_get(self, application_id: int) -> Application: def application_get(self, application_id: int) -> Application:
...@@ -149,11 +141,7 @@ class ZoeClient: ...@@ -149,11 +141,7 @@ class ZoeClient:
# Containers # Containers
def container_stats(self, container_id): def container_stats(self, container_id):
try: return self.ipc_server.ask('container_stats', container_id=container_id)
self.state.query(ContainerState).filter_by(id=container_id).one()
except NoResultFound:
return None
return self.server.container_stats(container_id)
# Executions # Executions
def execution_delete(self, execution_id: int) -> None: def execution_delete(self, execution_id: int) -> None:
...@@ -193,51 +181,23 @@ class ZoeClient: ...@@ -193,51 +181,23 @@ class ZoeClient:
return None return None
def execution_spark_new(self, application_id: int, name, commandline=None, spark_options=None) -> bool: def execution_spark_new(self, application_id: int, name, commandline=None, spark_options=None) -> bool:
try: ret = self.ipc_server.ask('execution_spark_new', application_id=application_id, name=name, commandline=commandline, spark_options=spark_options)
application = self.state.query(ApplicationState).filter_by(id=application_id).one() return ret is not None
except NoResultFound:
return None
if type(application) is SparkSubmitApplicationState:
if commandline is None:
raise ValueError("Spark submit application requires a commandline")
execution = SparkSubmitExecutionState(name=name,
application_id=application.id,
status="submitted",
commandline=commandline,
spark_opts=spark_options)
else:
execution = ExecutionState(name=name,
application_id=application.id,
status="submitted")
self.state.add(execution)
self.state.commit()
ret = self.server.execution_schedule(execution.id)
return ret
def execution_terminate(self, execution_id: int) -> None: def execution_terminate(self, execution_id: int) -> None:
try: ret = self.ipc_server.ask('execution_terminate', execution_id=execution_id)
self.state.query(ExecutionState).filter_by(id=execution_id).one() return ret is not None
except NoResultFound:
pass
self.server.execution_terminate(execution_id)
# Logs # Logs
def log_get(self, container_id: int) -> str: def log_get(self, container_id: int) -> str:
try: clog = self.ipc_server.ask('log_get', container_id=container_id)
self.state.query(ContainerState).filter_by(id=container_id).one() if clog is not None:
except NoResultFound: return clog['log']
return None
else:
ret = self.server.log_get(container_id)
return ret
def log_history_get(self, execution_id): def log_history_get(self, execution_id):
try: data = self.ipc_server.ask('log_history_get', execution_id=execution_id)
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one() log_data = base64.b64decode(data['zip_data'])
except NoResultFound: return log_data
return None
return storage.logs_archive_download(execution)
# Platform # Platform
def platform_stats(self) -> dict: def platform_stats(self) -> dict:
...@@ -263,10 +223,3 @@ class ZoeClient: ...@@ -263,10 +223,3 @@ class ZoeClient:
user_dict = self.ipc_server.ask('user_get_by_email', user_email=email) user_dict = self.ipc_server.ask('user_get_by_email', user_email=email)
if user_dict is not None: if user_dict is not None:
return User(user_dict) return User(user_dict)
def get_zoe_client() -> ZoeClient:
if rpycconf['client_rpyc_autodiscovery']:
return ZoeClient()
else:
return ZoeClient(rpycconf['client_rpyc_server'], rpycconf['client_rpyc_port'])
import base64
import logging import logging
import threading import threading
...@@ -5,7 +6,11 @@ from sqlalchemy.orm.exc import NoResultFound ...@@ -5,7 +6,11 @@ from sqlalchemy.orm.exc import NoResultFound
import zmq import zmq
from common.state import AlchemySession from common.state import AlchemySession
from common.state.application import ApplicationState, SparkSubmitApplicationState
from common.state.container import ContainerState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
from common.state.user import UserState from common.state.user import UserState
import common.object_storage as storage
from zoe_scheduler.scheduler import ZoeScheduler from zoe_scheduler.scheduler import ZoeScheduler
...@@ -18,18 +23,23 @@ class ZoeIPCServer: ...@@ -18,18 +23,23 @@ class ZoeIPCServer:
self.socket = self.context.socket(zmq.REP) self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://*:%s" % port) self.socket.bind("tcp://*:%s" % port)
self.th = None self.th = None
self.state = AlchemySession() self.state = None
self.sched = scheduler self.sched = scheduler
def start_loop(self): def start_thread(self):
self.th = threading.Thread(target=self._loop, name="IPC server", daemon=True) self.th = threading.Thread(target=self._loop, name="IPC server", daemon=True)
self.th.start() self.th.start()
def _loop(self): def _loop(self):
self.state = AlchemySession() # thread-local session
log.debug("IPC server thread started") log.debug("IPC server thread started")
while True: while True:
message = self.socket.recv_json() message = self.socket.recv_json()
reply = self._dispatch(message) try:
reply = self._dispatch(message)
except:
log.exception("Uncaught exception in IPC server thread")
reply = self._reply_error('exception')
self.socket.send_json(reply) self.socket.send_json(reply)
def _dispatch(self, message: dict) -> dict: def _dispatch(self, message: dict) -> dict:
...@@ -49,16 +59,80 @@ class ZoeIPCServer: ...@@ -49,16 +59,80 @@ class ZoeIPCServer:
return func(**message["args"]) return func(**message["args"])
def _reply_ok(self, reply: dict) -> dict: def _reply_ok(self, **reply) -> dict:
return {'status': 'ok', 'answer': reply} return {'status': 'ok', 'answer': reply}
def _reply_error(self, error_msg: str) -> dict: def _reply_error(self, error_msg: str) -> dict:
return {'status': 'error', 'answer': error_msg} return {'status': 'error', 'answer': error_msg}
# ############# Exposed methods below ################
# Containers
def container_stats(self, container_id: int) -> dict:
ret = self.sched.platform.container_stats(container_id).to_dict()
return self._reply_ok(**ret)
# Executions
def execution_spark_new(self, application_id: int, name: str, commandline=None, spark_options=None) -> dict:
try:
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return self._reply_error('no such application')
if type(application) is SparkSubmitApplicationState:
if commandline is None:
raise ValueError("Spark submit application requires a commandline")
execution = SparkSubmitExecutionState(name=name,
application_id=application.id,
status="submitted",
commandline=commandline,
spark_opts=spark_options)
else:
execution = ExecutionState(name=name,
application_id=application.id,
status="submitted")
self.state.add(execution)
ret = self.sched.incoming(execution)
if ret:
execution.set_scheduled()
self.state.commit()
else:
self._reply_error('admission control refused this application execution')
self.state.rollback()
return self._reply_ok()
def execution_terminate(self, execution_id: int) -> dict:
state = AlchemySession()
execution = state.query(ExecutionState).filter_by(id=execution_id).one()
self.sched.execution_terminate(state, execution)
state.commit()
state.close()
return self._reply_ok()
# Logs
def log_get(self, container_id: int) -> dict:
try:
container = self.state.query(ContainerState).filter_by(id=container_id).one()
except NoResultFound:
return self._reply_error('no such container')
else:
ret = self.sched.platform.log_get(container)
return self._reply_ok(log=ret)
def log_history_get(self, execution_id) -> dict:
try:
execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
except NoResultFound:
return self._reply_error('no such execution')
log_data = storage.logs_archive_download(execution)
log_data = base64.b64encode(log_data)
return self._reply_ok(zip_data=log_data.decode('ascii'))
# Platform # Platform
def platform_stats(self): def platform_stats(self) -> dict:
ret = self.sched.platform_status.stats() ret = self.sched.platform_status.stats()
return self._reply_ok(ret.to_dict()) return self._reply_ok(**ret.to_dict())
# Users # Users
def user_get(self, user_id) -> dict: def user_get(self, user_id) -> dict:
...@@ -67,7 +141,7 @@ class ZoeIPCServer: ...@@ -67,7 +141,7 @@ class ZoeIPCServer:
except NoResultFound: except NoResultFound:
return self._reply_error('no such user') return self._reply_error('no such user')
else: else:
return self._reply_ok(user.to_dict()) return self._reply_ok(**user.to_dict())
def user_get_by_email(self, user_email) -> dict: def user_get_by_email(self, user_email) -> dict:
try: try:
...@@ -75,10 +149,10 @@ class ZoeIPCServer: ...@@ -75,10 +149,10 @@ class ZoeIPCServer:
except NoResultFound: except NoResultFound:
return self._reply_error('no such user') return self._reply_error('no such user')
else: else:
return self._reply_ok(user.to_dict()) return self._reply_ok(**user.to_dict())
def user_new(self, email: str) -> dict: def user_new(self, email: str) -> dict:
user = UserState(email=email) user = UserState(email=email)
self.state.add(user) self.state.add(user)
self.state.commit() self.state.commit()
return self._reply_ok(user.to_dict()) return self._reply_ok(**user.to_dict())
...@@ -45,7 +45,9 @@ class ProxyManager: ...@@ -45,7 +45,9 @@ class ProxyManager:
def _get_proxy_entries(self): def _get_proxy_entries(self):
state = AlchemySession() state = AlchemySession()
return state.query(ProxyState).all() ret = state.query(ProxyState).all()
state.close()
return ret
def _generate_file(self, proxy_entries): def _generate_file(self, proxy_entries):
output = "" output = ""
...@@ -100,5 +102,6 @@ class ProxyManager: ...@@ -100,5 +102,6 @@ class ProxyManager:
proxy.container.cluster.execution.termination_notice = False proxy.container.cluster.execution.termination_notice = False
if something_to_commit: if something_to_commit:
state.commit() state.commit()
state.close()
pm = ProxyManager() pm = ProxyManager()
import rpyc
from sqlalchemy.orm.exc import NoResultFound
from zoe_scheduler.scheduler import zoe_sched
from zoe_scheduler.stats import PlatformStats, ContainerStats
from common.state import AlchemySession, ContainerState
from common.state.execution import ExecutionState
class ZoeSchedulerRPCService(rpyc.Service):
sched = zoe_sched
def on_connect(self):
pass
def on_disconnect(self):
pass
def exposed_container_stats(self, container_id: int) -> ContainerStats:
return self.sched.platform.container_stats(container_id)
def exposed_execution_schedule(self, execution_id: int) -> bool:
state = AlchemySession()
execution = state.query(ExecutionState).filter_by(id=execution_id).one()
ret = self.sched.incoming(execution)
if ret:
execution.set_scheduled()
state.commit()
return ret
def exposed_execution_terminate(self, execution_id: int) -> bool:
state = AlchemySession()
execution = state.query(ExecutionState).filter_by(id=execution_id).one()
self.sched.execution_terminate(state, execution)
state.commit()
return True
def exposed_log_get(self, container_id: int) -> str:
state = AlchemySession()
try:
container = state.query(ContainerState).filter_by(id=container_id).one()
except NoResultFound:
return None
return self.sched.platform.log_get(container)
def exposed_platform_stats(self) -> PlatformStats:
return self.sched.platform_status.stats()
...@@ -75,7 +75,6 @@ class ZoeScheduler: ...@@ -75,7 +75,6 @@ class ZoeScheduler:
def init_tasks(self, tm: PeriodicTaskManager): def init_tasks(self, tm: PeriodicTaskManager):
tm