master_api.py 4.12 KB
Newer Older
Daniele Venzano's avatar
Daniele Venzano committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16 17
"""Master side of the ZeroMQ based API."""

Daniele Venzano's avatar
Daniele Venzano committed
18 19 20 21 22
import logging
import time

import zmq

23
import zoe_lib.config as config
24
from zoe_lib.metrics.base import BaseMetricSender
Daniele Venzano's avatar
Daniele Venzano committed
25
from zoe_lib.state import SQLManager
26

27
import zoe_master.preprocessing
28
from zoe_master.exceptions import ZoeException
29
from zoe_master.scheduler import ZoeBaseScheduler
Daniele Venzano's avatar
Daniele Venzano committed
30 31 32 33 34

log = logging.getLogger(__name__)


class APIManager:
35
    """The API Manager."""
36
    def __init__(self, metrics: BaseMetricSender, scheduler: ZoeBaseScheduler, state: SQLManager) -> None:
37 38 39
        self.context = zmq.Context()
        self.zmq_s = self.context.socket(zmq.REP)
        self.listen_uri = config.get_conf().api_listen_uri
Daniele Venzano's avatar
Daniele Venzano committed
40 41
        self.zmq_s.bind(self.listen_uri)
        self.debug_has_replied = False
42 43 44
        self.metrics = metrics
        self.scheduler = scheduler
        self.state = state
Daniele Venzano's avatar
Daniele Venzano committed
45

Daniele Venzano's avatar
Daniele Venzano committed
46
    def _reply_error(self, message: str) -> None:
Daniele Venzano's avatar
Daniele Venzano committed
47 48 49
        self.zmq_s.send_json({'result': 'error', 'message': message})
        self.debug_has_replied = True

50 51 52 53 54 55 56
    def _reply_ok(self, data=None):
        reply = {
            'result': 'ok'
        }
        if data is not None:
            reply['data'] = data
        self.zmq_s.send_json(reply)
Daniele Venzano's avatar
Daniele Venzano committed
57 58 59
        self.debug_has_replied = True

    def loop(self):
60
        """The API loop."""
Daniele Venzano's avatar
Daniele Venzano committed
61 62 63 64 65 66
        while True:
            message = self.zmq_s.recv_json()
            self.debug_has_replied = False
            start_time = time.time()
            if message['command'] == 'execution_start':
                exec_id = message['exec_id']
67
                execution = self.state.execution_list(id=exec_id, only_one=True)
68 69 70 71 72
                if execution is None:
                    self._reply_error('Execution ID {} not found'.format(message['exec_id']))
                else:
                    execution.set_scheduled()
                    self._reply_ok()
73
                    zoe_master.preprocessing.execution_submit(self.state, self.scheduler, execution)
74 75
            elif message['command'] == 'execution_terminate':
                exec_id = message['exec_id']
76
                execution = self.state.execution_list(id=exec_id, only_one=True)
Daniele Venzano's avatar
Daniele Venzano committed
77 78 79
                if execution is None:
                    self._reply_error('Execution ID {} not found'.format(message['exec_id']))
                else:
80
                    execution.set_cleaning_up()
Daniele Venzano's avatar
Daniele Venzano committed
81
                    self._reply_ok()
82
                    zoe_master.preprocessing.execution_terminate(self.scheduler, execution)
83 84
            elif message['command'] == 'execution_delete':
                exec_id = message['exec_id']
85
                execution = self.state.execution_list(id=exec_id, only_one=True)
86
                if execution is not None:
87
                    zoe_master.preprocessing.execution_delete(execution)
88
                self._reply_ok()
89
            elif message['command'] == 'scheduler_stats':
90 91 92 93 94 95 96
                try:
                    data = self.scheduler.stats()
                except ZoeException as e:
                    log.error(str(e))
                    self._reply_error(str(e))
                else:
                    self._reply_ok(data=data)
Daniele Venzano's avatar
Daniele Venzano committed
97 98 99 100 101 102 103 104
            else:
                log.error('Unknown command: {}'.format(message['command']))
                self._reply_error('unknown command')

            if not self.debug_has_replied:
                self._reply_error('bug')
                raise ZoeException('BUG: command {} does not fill a reply')

105
            self.metrics.metric_api_call(start_time, message['command'])
106

Daniele Venzano's avatar
Daniele Venzano committed
107
    def quit(self) -> None:
108
        """Cleanly close the ZMQ resources."""
109 110
        self.zmq_s.close()
        self.context.term()