Commit 06319b14 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

ZeroMQ API

parent a0be1a34
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import zmq
from zoe_lib.exceptions import ZoeException
from zoe_master.config import get_conf, singletons
log = logging.getLogger(__name__)
class APIManager:
def __init__(self):
context = zmq.Context()
self.zmq_s = context.socket(zmq.REP)
self.listen_uri = get_conf().api_listen_uri
self.zmq_s.bind(self.listen_uri)
self.debug_has_replied = False
def _reply_error(self, message):
self.zmq_s.send_json({'result': 'error', 'message': message})
self.debug_has_replied = True
def _reply_ok(self):
self.zmq_s.send_json({'result': 'ok'})
self.debug_has_replied = True
def loop(self):
while True:
message = self.zmq_s.recv_json()
self.debug_has_replied = False
start_time = time.time()
if message['command'] == 'execution_start':
exec_id = message['exec_id']
execution = singletons['sql_manager'].execution_get(id=exec_id)
if execution is None:
self._reply_error('Execution ID {} not found'.format(message['exec_id']))
else:
self._reply_ok()
singletons['platform_manager'].execution_submit(execution)
else:
log.error('Unknown command: {}'.format(message['command']))
self._reply_error('unknown command')
if not self.debug_has_replied:
self._reply_error('bug')
raise ZoeException('BUG: command {} does not fill a reply')
singletons['metric'].metric_api_call(start_time, message['command'])
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import zmq
import zoe_web.config as config
log = logging.getLogger(__name__)
class APIManager:
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
def __init__(self):
self.context = zmq.Context(1)
self.zmq_s = None
self.poll = zmq.Poller()
self.master_uri = config.get_conf().master_url
self._connect()
def _connect(self):
if self.zmq_s is not None:
return
self.zmq_s = self.context.socket(zmq.REQ)
self.zmq_s.connect(self.master_uri)
self.poll.register(self.zmq_s, zmq.POLLIN)
def _disconnect(self):
self.zmq_s.setsockopt(zmq.LINGER, 0)
self.zmq_s.close()
self.poll.unregister(self.zmq_s)
self.zmq_s = None
def _request_reply(self, message):
"""
Implements the Lazy Pirate Pattern for a reliable client communication.
"""
self._connect() # Make sure we are connected
retries_left = self.REQUEST_RETRIES
while retries_left:
self.zmq_s.send_json(message) # send the message
socks = dict(self.poll.poll(self.REQUEST_TIMEOUT))
if socks.get(self.zmq_s) == zmq.POLLIN: # We have a reply
reply = self.zmq_s.recv_json()
if reply['result'] == 'ok':
return True, ''
else:
return False, reply['message']
else: # Timeout
retries_left -= 1
log.warning('Timeout waiting for master reply')
self._disconnect()
if retries_left == 0:
log.error('Master is unreachable, abandoning request')
return False, 'Master is unreachable, abandoning request'
log.warning('Reconnecting and retrying request...')
self._connect()
def execution_start(self, exec_id):
msg = {
'command': 'execution_start',
'exec_id': exec_id
}
return self._request_reply(msg)
def execution_terminate(self, exec_id):
msg = {
'command': 'execution_terminate',
'exec_id': exec_id
}
return self._request_reply(msg)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment