Commit bdb69a95 authored by Francesco Pace's avatar Francesco Pace
Browse files

Implement connection to Crystal

parent 3c129690
......@@ -70,6 +70,12 @@ def app_validate(data):
for service in data['services']:
_service_check(service)
if 'plugins' in data:
plugins = data['plugins']
if 'storage' in plugins:
for plugin in plugins['storage']:
_storage_plugin_check(plugin)
found_monitor = False
for service in data['services']:
if service['monitor']:
......@@ -172,3 +178,35 @@ def _port_check(data):
bool(data['is_main_endpoint'])
except ValueError:
raise InvalidApplicationDescription(msg="is_main_endpoint field should be a boolean")
def _storage_plugin_check(data):
required_keys = ['name', 'tenant', 'policy', 'channel']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
supported_plugins = ['crystal']
if data['name'] not in supported_plugins:
raise InvalidApplicationDescription(msg="Storage plugin ({}) not supported. {}".format(data['name'], supported_plugins))
supported_policies = ['platinum, gold, silver, bronze']
if data['policy'] not in supported_policies:
raise InvalidApplicationDescription(msg="Policy ({}) not supported. {}".format(data['policy'], supported_policies))
_channel_check(data['channel'])
def _channel_check(data):
required_keys = ['protocol', 'host']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
supported_protocols = ['rabbitmq']
if data['protocol'] not in supported_protocols:
raise InvalidApplicationDescription(msg="Protocol ({}) for storage plugin channel not supported. {}".format(data['protocol'], supported_protocols))
if data['protocol'] is 'rabbitmq':
if 'queue' not in data:
raise InvalidApplicationDescription(msg="Missing required key: queue")
# Copyright (c) 2017, Pace Francesco
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pika
class RabbitMQ:
def __init__(self, host):
self.host = host
self.channel = None
def open_connection(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(self.host))
self.channel = connection.channel()
def send_message(self, queue, message):
self.channel.basic_publish(exchange='',
routing_key=queue,
body=message)
def close_connection(self):
self.channel.close()
# Copyright (c) 2017, Pace Francesco
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zoe_master.plugins.connectors import RabbitMQ
class Crystal:
def __init__(self, channel):
self.channel = channel
def transmit_policy(self, tenant, policy):
message = "{}:{}".format(tenant, policy)
if self.channel['protocol'] is 'rabbitmq':
rmq = RabbitMQ(self.channel['host'])
rmq.open_connection()
rmq.send_message(self.channel['queue'], message)
rmq.close_connection()
......@@ -21,6 +21,7 @@ import threading
from zoe_lib.sql_manager import Execution
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
from zoe_master.plugins.storage.crystal import Crystal
from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
......@@ -106,6 +107,14 @@ class ZoeSimpleScheduler(ZoeBaseScheduler):
try:
execution_to_containers(e)
if 'plugins' in e.description:
plugins = e.description['plugins']
if 'storage' in plugins:
for plugin in plugins['storage']:
if plugin['name'] is 'crystal':
Crystal(plugin['channel']).transmit_policy(plugin['tenant'], plugin['policy'])
except ZoeStartExecutionRetryException as ex:
log.warning('Temporary failure starting execution {}: {}'.format(e.id, ex.message))
e.set_error_message(ex.message)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment