Commit bfbc8dbe authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Start refactor of ZoeClient

The functionality provided by the zoe_client package no longer fits a single ZoeClient class.
parent 0f2464c3
import logging
from zoe_scheduler.exceptions import InvalidApplicationDescription
from common.exceptions import InvalidApplicationDescription
log = logging.getLogger(__name__)
......
......@@ -2,12 +2,11 @@ import logging
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.ipc import ZoeIPCClient
from zoe_client.entities import Execution
from zoe_client.lib.ipc import ZoeIPCClient
from zoe_client.scheduler_classes.execution import Execution
from zoe_client.state import session
from zoe_client.state.application import ApplicationState
from zoe_client.state.user import UserState
import zoe_client.storage_helper as storage
import zoe_client.zoe_storage_client as storage
log = logging.getLogger(__name__)
......@@ -15,10 +14,10 @@ log = logging.getLogger(__name__)
class ZoeClient:
def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.state = session()
def _check_application(self, application_id: int):
state = session()
app_count = state.query(ApplicationState).filter_by(id=application_id).count()
app_count = self.state.query(ApplicationState).filter_by(id=application_id).count()
return app_count == 1
# Applications
......@@ -41,9 +40,8 @@ class ZoeClient:
return [Execution(e) for e in answer["executions"]]
def application_get(self, application_id):
state = session()
try:
application = state.query(ApplicationState).filter_by(id=application_id).one()
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return None
else:
......@@ -55,8 +53,7 @@ class ZoeClient:
:param user_id: the user
:returns a list of ApplicationState objects
"""
state = session()
return state.query(ApplicationState).filter_by(user_id=user_id).all()
return self.state.query(ApplicationState).filter_by(user_id=user_id).all()
def application_new(self, user_id: int, description: dict) -> ApplicationState:
answer = self.ipc_server.ask('application_validate', description=description)
......@@ -64,10 +61,9 @@ class ZoeClient:
log.error("Application description failed the scheduler validation")
return None
state = session()
application = ApplicationState(user_id=user_id, description=description)
state.add(application)
state.commit()
self.state.add(application)
self.state.commit()
return application
def application_remove(self, application_id: int):
......@@ -82,11 +78,10 @@ class ZoeClient:
for e in executions:
self.execution_delete(e.id)
state = session()
application = state.query(ApplicationState).filter_by(id=application_id).one()
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
storage.delete_application(application_id)
state.delete(application)
state.commit()
self.state.delete(application)
self.state.commit()
def application_validate(self, description: dict) -> bool:
answer = self.ipc_server.ask('application_validate', description=description)
......@@ -111,9 +106,8 @@ class ZoeClient:
return Execution(answer["execution"])
def execution_start(self, application_id: int) -> Execution:
state = session()
try:
application = state.query(ApplicationState).filter_by(id=application_id).one()
application = self.state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
log.error("No such application")
return None
......@@ -135,40 +129,7 @@ class ZoeClient:
if answer is not None:
return answer['log']
def log_history_get(self, execution_id: int) -> bytes:
return storage.download_log_archive(execution_id)
# Platform
def platform_stats(self) -> dict:
stats = self.ipc_server.ask('platform_stats')
return stats
# Users
def user_check(self, user_id: int) -> bool:
state = session()
num = state.query(UserState).filter_by(id=user_id).count()
return num == 1
def user_new(self, email: str) -> UserState:
state = session()
user = UserState(email=email)
state.add(user)
state.commit()
return user
def user_get(self, user_id: int) -> UserState:
state = session()
try:
user = state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return None
return user
def user_get_by_email(self, email: str) -> UserState:
state = session()
try:
user = state.query(UserState).filter_by(email=email).one()
except NoResultFound:
return None
else:
return user
......@@ -8,6 +8,7 @@ import sys
from zoe_client import ZoeClient
from zoe_client.configuration import conf_init, client_conf
from zoe_client.state import init as state_init, create_tables
import zoe_client.users as users
def get_zoe_client() -> ZoeClient:
......@@ -21,14 +22,12 @@ def stats_cmd(_):
def user_new_cmd(args):
client = get_zoe_client()
user = client.user_new(args.email)
user = users.user_new(args.email)
print("New user ID: {}".format(user.id))
def user_get_cmd(args):
client = get_zoe_client()
user = client.user_get_by_email(args.email)
user = users.user_get_by_email(args.email)
print("User ID: {}".format(user.id))
......
import logging
import zmq
log = logging.getLogger(__name__)
class ZoeIPCClient:
def __init__(self, server, port=8723):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.RCVTIMEO = 2000
self.socket.connect("tcp://%s:%d" % (server, port))
log.debug("ZMQ socket connected")
def _ask(self, message: dict) -> dict:
self.socket.send_json(message)
try:
answer = self.socket.recv_json()
except zmq.error.Again:
log.error("ZMQ is asking to try again, we drop the message")
return None
except zmq.ZMQError as e:
log.error("IPC server error: {}".format(e.msg))
return None
if self._is_error(answer):
log.info("IPC error: {}".format(self._error(answer)))
return None
else:
return self._answer(answer)
def ask(self, command: str, **kwargs) -> dict:
q = {
'command': command,
'args': kwargs
}
return self._ask(q)
def _is_error(self, message: dict) -> bool:
return message['status'] == 'error'
def _error(self, message: dict) -> str:
return message['answer']
def _answer(self, message: dict) -> dict:
return message['answer']
"""
Scheduler classes
This package contains class definitions for entities managed by the scheduler and passed to the client via IPC.
"""
\ No newline at end of file
class Container:
def __init__(self, container: dict):
self.id = container['id']
self.docker_id = container['docker_id']
self.cluster_id = container['cluster_id']
self.ip_address = container['ip_address']
self.readable_name = container['readable_name']
\ No newline at end of file
import dateutil.parser
from zoe_client.scheduler_classes.container import Container
def deserialize_datetime(isoformat):
if isoformat is None:
......@@ -8,12 +10,6 @@ def deserialize_datetime(isoformat):
return dateutil.parser.parse(isoformat)
class User:
def __init__(self, user: dict):
self.id = user['id']
self.email = user['email']
class Execution:
def __init__(self, execution: dict):
self.id = execution['id']
......@@ -30,12 +26,3 @@ class Execution:
for c in execution['containers']:
self.containers.append(Container(c))
class Container:
def __init__(self, container: dict):
self.id = container['id']
self.docker_id = container['docker_id']
self.cluster_id = container['cluster_id']
self.ip_address = container['ip_address']
self.readable_name = container['readable_name']
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.state import session
from zoe_client.state.user import UserState
def user_check(user_id: int) -> bool:
state = session()
num = state.query(UserState).filter_by(id=user_id).count()
return num == 1
def user_new(email: str) -> UserState:
state = session()
user = UserState(email=email)
state.add(user)
state.commit()
return user
def user_get(user_id: int) -> UserState:
state = session()
try:
user = state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return None
return user
def user_get_by_email(email: str) -> UserState:
state = session()
try:
user = state.query(UserState).filter_by(email=email).one()
except NoResultFound:
return None
else:
return user
......@@ -48,8 +48,8 @@ def download_application(application_id) -> bytes:
return _download(application_id, "apps")
def download_log_archive(execution_id) -> bytes:
return _download(execution_id, "logs")
def download_log_url(execution_id) -> bytes:
return client_conf().object_storage_url + '/logs/{}'.format(execution_id)
def delete_application(application_id):
......
......@@ -9,9 +9,9 @@ import zmq
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.container import ContainerState
from zoe_scheduler.state.execution import ExecutionState
from zoe_scheduler.application_description import ZoeApplication
from common.application_description import ZoeApplication
from zoe_scheduler.scheduler import ZoeScheduler
from zoe_scheduler.exceptions import InvalidApplicationDescription
from common.exceptions import InvalidApplicationDescription
from zoe_scheduler.configuration import scheduler_conf
log = logging.getLogger(__name__)
......
import logging
from zoe_scheduler.application_description import ZoeApplication, ZoeApplicationProcess
from zoe_scheduler.exceptions import CannotCreateCluster
from zoe_scheduler.storage_helper import generate_application_binary_url, logs_archive_create
from common.application_description import ZoeApplication, ZoeApplicationProcess
from common.exceptions import CannotCreateCluster
from zoe_scheduler.zoe_storage_client import generate_application_binary_url, logs_archive_create
from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.cluster import ClusterState
......
import logging
import queue
from zoe_scheduler.application_description import ZoeApplication
from common.application_description import ZoeApplication
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.execution import ExecutionState
from zoe_scheduler.scheduler_policies.base import BaseSchedulerPolicy
......
from zoe_scheduler.application_description import ZoeApplication
from common.application_description import ZoeApplication
from zoe_scheduler.stats import SchedulerStats
......
......@@ -3,7 +3,7 @@ import time
from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.stats import SchedulerStats
from zoe_scheduler.scheduler_policies.base import BaseSchedulerPolicy
from zoe_scheduler.application_description import ZoeApplication
from common.application_description import ZoeApplication
class SimpleSchedulerPolicy(BaseSchedulerPolicy):
......
......@@ -3,7 +3,13 @@ from flask import Flask
from zoe_web.api import api_bp
from zoe_web.web import web_bp
from zoe_client.state import session
app = Flask(__name__, static_url_path='/does-not-exist')
app.register_blueprint(web_bp, url_prefix='')
app.register_blueprint(api_bp, url_prefix='/api')
@app.teardown_appcontext
def shutdown_session(exception=None):
session().remove()
......@@ -4,7 +4,7 @@ from zipfile import is_zipfile
from flask import Blueprint, jsonify, request, session, abort, send_file
from zoe_client import ZoeClient
from zoe_scheduler.configuration import ipcconf
from zoe_client.configuration import client_conf
api_bp = Blueprint('api', __name__)
......@@ -21,7 +21,7 @@ def _api_check_user(zoe_client):
@api_bp.route('/status/basic')
def status_basic():
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
platform_stats = client.platform_stats()
ret = {
'num_nodes': len(platform_stats['swarm']['nodes']),
......@@ -34,7 +34,7 @@ def status_basic():
def login():
form_data = request.form
email = form_data["email"]
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = client.user_get_by_email(email)
if user is None:
user = client.user_new(email)
......@@ -44,7 +44,7 @@ def login():
@api_bp.route('/applications/new', methods=['POST'])
def application_new():
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = _api_check_user(client)
form_data = request.form
......@@ -66,7 +66,7 @@ def application_new():
@api_bp.route('/applications/delete/<app_id>', methods=['GET', 'POST'])
def application_delete(app_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
if client.application_remove(app_id, False):
......@@ -77,7 +77,7 @@ def application_delete(app_id):
@api_bp.route('/applications/download/<int:app_id>')
def application_binary_download(app_id: int):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
data = client.application_get_binary(app_id)
......@@ -89,7 +89,7 @@ def application_binary_download(app_id: int):
@api_bp.route('/executions/new', methods=['POST'])
def execution_new():
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
form_data = request.form
......@@ -109,7 +109,7 @@ def execution_new():
@api_bp.route('/executions/logs/container/<int:container_id>')
def execution_logs(container_id: int):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
log = client.log_get(container_id)
......@@ -121,7 +121,7 @@ def execution_logs(container_id: int):
@api_bp.route('/executions/stats/container/<int:container_id>')
def container_stats(container_id: int):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
stats = client.container_stats(container_id)
......@@ -133,7 +133,7 @@ def container_stats(container_id: int):
@api_bp.route('/executions/terminate/<int:exec_id>')
def execution_terminate(exec_id: int):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
client.execution_terminate(exec_id)
......@@ -143,7 +143,7 @@ def execution_terminate(exec_id: int):
@api_bp.route('/history/logs/<int:execution_id>')
def history_logs_get(execution_id: int):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
_api_check_user(client)
logs = client.log_history_get(execution_id)
......
......@@ -6,7 +6,8 @@ from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from zoe_web import app
from zoe_scheduler.configuration import ipcconf, init as conf_init
from zoe_client.configuration import conf_init, client_conf
from zoe_client.state import init as state_init
log = logging.getLogger("zoe_web")
......@@ -14,8 +15,8 @@ log = logging.getLogger("zoe_web")
def process_arguments() -> argparse.Namespace:
argparser = argparse.ArgumentParser(description="Zoe Web - Container Analytics as a Service web client")
argparser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output')
argparser.add_argument('--ipc-server', default='localhost', help='Address of the Zoe scheduler process')
argparser.add_argument('--ipc-port', default=8723, type=int, help='Port of the Zoe scheduler process')
argparser.add_argument('--ipc-server', help='Address of the Zoe scheduler process')
argparser.add_argument('--ipc-port', help='Port of the Zoe scheduler process')
return argparser.parse_args()
......@@ -33,14 +34,17 @@ def zoe_web() -> int:
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("tornado").setLevel(logging.WARNING)
ipcconf['server'] = args.ipc_server
ipcconf['port'] = args.ipc_port
conf_init()
if args.ipc_server is not None:
client_conf().set('zoe_client', 'scheduler_ipc_address', args.ipc_server)
if args.ipc_port is not None:
client_conf().set('zoe_client', 'scheduler_ipc_port', args.ipc_port)
zoeconf = conf_init()
state_init(client_conf().db_url)
log.info("Starting HTTP server...")
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
app.secret_key = zoeconf.cookies_secret_key
app.secret_key = client_conf().cookies_secret_key
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(5000, "0.0.0.0")
......
from flask import render_template, url_for, redirect
from zoe_client import ZoeClient
from zoe_scheduler.configuration import ipcconf
from zoe_client.configuration import client_conf
from zoe_web.web import web_bp
import zoe_web.utils as web_utils
@web_bp.route('/apps/new')
def application_new():
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = web_utils.check_user(client)
if user is None:
return redirect(url_for('web.index'))
......@@ -22,7 +22,7 @@ def application_new():
@web_bp.route('/executions/new/<app_id>')
def execution_new(app_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = web_utils.check_user(client)
if user is None:
return redirect(url_for('web.index'))
......@@ -38,7 +38,7 @@ def execution_new(app_id):
@web_bp.route('/executions/terminate/<exec_id>')
def execution_terminate(exec_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = web_utils.check_user(client)
if user is None:
return redirect(url_for('web.index'))
......@@ -54,7 +54,7 @@ def execution_terminate(exec_id):
@web_bp.route('/apps/delete/<app_id>')
def application_delete(app_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = web_utils.check_user(client)
if user is None:
return redirect(url_for('web.index'))
......@@ -70,7 +70,7 @@ def application_delete(app_id):
@web_bp.route('/executions/inspect/<execution_id>')
def execution_inspect(execution_id):
client = ZoeClient(ipcconf['server'], ipcconf['port'])
client = ZoeClient(client_conf().ipc_server, client_conf().ipc_port)
user = web_utils.check_user(client)
if user is None:
return redirect(url_for('web.index'))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment