master_api.py 3.17 KB
Newer Older
Daniele Venzano's avatar
Daniele Venzano committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16 17
"""The client side of the ZeroMQ API."""

Daniele Venzano's avatar
Daniele Venzano committed
18 19 20 21
import logging

import zmq

22
import zoe_lib.config as config
Daniele Venzano's avatar
Daniele Venzano committed
23 24 25 26 27

log = logging.getLogger(__name__)


class APIManager:
28
    """Main class for the API."""
Daniele Venzano's avatar
Daniele Venzano committed
29
    REQUEST_TIMEOUT = 2500
30
    REQUEST_RETRIES = 1
Daniele Venzano's avatar
Daniele Venzano committed
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63

    def __init__(self):
        self.context = zmq.Context(1)
        self.zmq_s = None
        self.poll = zmq.Poller()
        self.master_uri = config.get_conf().master_url
        self._connect()

    def _connect(self):
        if self.zmq_s is not None:
            return
        self.zmq_s = self.context.socket(zmq.REQ)
        self.zmq_s.connect(self.master_uri)
        self.poll.register(self.zmq_s, zmq.POLLIN)

    def _disconnect(self):
        self.zmq_s.setsockopt(zmq.LINGER, 0)
        self.zmq_s.close()
        self.poll.unregister(self.zmq_s)
        self.zmq_s = None

    def _request_reply(self, message):
        """
        Implements the Lazy Pirate Pattern for a reliable client communication.
        """
        self._connect()  # Make sure we are connected
        retries_left = self.REQUEST_RETRIES
        while retries_left:
            self.zmq_s.send_json(message)  # send the message
            socks = dict(self.poll.poll(self.REQUEST_TIMEOUT))
            if socks.get(self.zmq_s) == zmq.POLLIN:  # We have a reply
                reply = self.zmq_s.recv_json()
                if reply['result'] == 'ok':
64
                    return True, '' if 'data' not in reply else reply['data']
Daniele Venzano's avatar
Daniele Venzano committed
65 66 67 68 69 70 71
                else:
                    return False, reply['message']
            else:  # Timeout
                retries_left -= 1
                log.warning('Timeout waiting for master reply')
                self._disconnect()
                if retries_left == 0:
72 73
                    log.error('Master is unreachable, abandoning API request')
                    return False, 'Master is unreachable, abandoning API request'
Daniele Venzano's avatar
Daniele Venzano committed
74 75 76 77
                log.warning('Reconnecting and retrying request...')
                self._connect()

    def execution_start(self, exec_id):
78
        """Start an execution."""
Daniele Venzano's avatar
Daniele Venzano committed
79 80 81 82 83 84 85
        msg = {
            'command': 'execution_start',
            'exec_id': exec_id
        }
        return self._request_reply(msg)

    def execution_terminate(self, exec_id):
86
        """Terminate an execution."""
Daniele Venzano's avatar
Daniele Venzano committed
87 88 89 90 91
        msg = {
            'command': 'execution_terminate',
            'exec_id': exec_id
        }
        return self._request_reply(msg)
92

93
    def execution_delete(self, exec_id):
94
        """Delete an execution."""
95 96 97 98 99
        msg = {
            'command': 'execution_delete',
            'exec_id': exec_id
        }
        return self._request_reply(msg)