ipc.py 5.81 KB
Newer Older
1
import base64
2 3 4 5 6 7 8
import logging
import threading

from sqlalchemy.orm.exc import NoResultFound
import zmq

from common.state import AlchemySession
9 10 11
from common.state.application import ApplicationState, SparkSubmitApplicationState
from common.state.container import ContainerState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
12
from common.state.user import UserState
13
import common.object_storage as storage
14 15 16 17 18 19 20 21 22 23 24 25

from zoe_scheduler.scheduler import ZoeScheduler

log = logging.getLogger(__name__)


class ZoeIPCServer:
    def __init__(self, scheduler: ZoeScheduler, port=8723):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        self.socket.bind("tcp://*:%s" % port)
        self.th = None
26
        self.state = None
27 28
        self.sched = scheduler

29
    def start_thread(self):
30 31 32 33
        self.th = threading.Thread(target=self._loop, name="IPC server", daemon=True)
        self.th.start()

    def _loop(self):
34
        self.state = AlchemySession()  # thread-local session
35 36 37
        log.debug("IPC server thread started")
        while True:
            message = self.socket.recv_json()
38 39 40 41 42
            try:
                reply = self._dispatch(message)
            except:
                log.exception("Uncaught exception in IPC server thread")
                reply = self._reply_error('exception')
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
            self.socket.send_json(reply)

    def _dispatch(self, message: dict) -> dict:
        if "command" not in message or "args" not in message:
            log.error("Ignoring malformed message: {}".format(message))
            return self._reply_error('malformed')

        if not isinstance(message['args'], dict):
            log.error("Ignoring malformed message: {}".format(message))
            return self._reply_error('malformed')

        try:
            func = getattr(self, message["command"])
        except AttributeError:
            log.error("Ignoring unkown command: {}".format(message["command"]))
            return self._reply_error('unknown command')

        return func(**message["args"])

62
    def _reply_ok(self, **reply) -> dict:
63 64 65 66 67
        return {'status': 'ok', 'answer': reply}

    def _reply_error(self, error_msg: str) -> dict:
        return {'status': 'error', 'answer': error_msg}

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    # ############# Exposed methods below ################
    # Containers
    def container_stats(self, container_id: int) -> dict:
        ret = self.sched.platform.container_stats(container_id).to_dict()
        return self._reply_ok(**ret)

    # Executions
    def execution_spark_new(self, application_id: int, name: str, commandline=None, spark_options=None) -> dict:
        try:
            application = self.state.query(ApplicationState).filter_by(id=application_id).one()
        except NoResultFound:
            return self._reply_error('no such application')

        if type(application) is SparkSubmitApplicationState:
            if commandline is None:
                raise ValueError("Spark submit application requires a commandline")
            execution = SparkSubmitExecutionState(name=name,
                                                  application_id=application.id,
                                                  status="submitted",
                                                  commandline=commandline,
                                                  spark_opts=spark_options)
        else:
            execution = ExecutionState(name=name,
                                       application_id=application.id,
                                       status="submitted")
        self.state.add(execution)

        ret = self.sched.incoming(execution)
        if ret:
            execution.set_scheduled()
            self.state.commit()
        else:
            self._reply_error('admission control refused this application execution')
            self.state.rollback()

        return self._reply_ok()

    def execution_terminate(self, execution_id: int) -> dict:
        state = AlchemySession()
        execution = state.query(ExecutionState).filter_by(id=execution_id).one()
        self.sched.execution_terminate(state, execution)
        state.commit()
        state.close()
        return self._reply_ok()

    # Logs
    def log_get(self, container_id: int) -> dict:
        try:
            container = self.state.query(ContainerState).filter_by(id=container_id).one()
        except NoResultFound:
            return self._reply_error('no such container')
        else:
            ret = self.sched.platform.log_get(container)
            return self._reply_ok(log=ret)

    def log_history_get(self, execution_id) -> dict:
        try:
            execution = self.state.query(ExecutionState).filter_by(id=execution_id).one()
        except NoResultFound:
            return self._reply_error('no such execution')
        log_data = storage.logs_archive_download(execution)
        log_data = base64.b64encode(log_data)
        return self._reply_ok(zip_data=log_data.decode('ascii'))

132
    # Platform
133
    def platform_stats(self) -> dict:
134
        ret = self.sched.platform_status.stats()
135
        return self._reply_ok(**ret.to_dict())
136 137 138 139 140 141 142 143

    # Users
    def user_get(self, user_id) -> dict:
        try:
            user = self.state.query(UserState).filter_by(id=user_id).one()
        except NoResultFound:
            return self._reply_error('no such user')
        else:
144
            return self._reply_ok(**user.to_dict())
145 146 147 148 149 150 151

    def user_get_by_email(self, user_email) -> dict:
        try:
            user = self.state.query(UserState).filter_by(email=user_email).one()
        except NoResultFound:
            return self._reply_error('no such user')
        else:
152
            return self._reply_ok(**user.to_dict())
153 154 155 156 157

    def user_new(self, email: str) -> dict:
        user = UserState(email=email)
        self.state.add(user)
        self.state.commit()
158
        return self._reply_ok(**user.to_dict())