Commit 88a8e3c4 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

Move the object storage in its own daemon, Applications in the client

The scheduler only knows about executions. Each execution has an application ID that can be used by the scheduler to group executions coming form the same application. The client has its own database that manages users and applications.
parent eb8d2f71
......@@ -72,7 +72,8 @@ setup(
'python-dateutil>=2.4.2',
'SQLAlchemy>=1.0.8',
'tornado>=4.2.1',
'pyzmq>=14.0.1'
'pyzmq>=14.0.1',
'requests'
],
# List additional groups of dependencies here (e.g. development
......@@ -103,6 +104,7 @@ setup(
entry_points={
'console_scripts': [
'zoe-scheduler=zoe_scheduler.entrypoint:zoe_scheduler',
'zoe-storage=zoe_storage.entrypoint:object_server',
'zoe-web=zoe_web.entrypoint:zoe_web',
'zoe=zoe_client.entrypoint:zoe'
]
......
......@@ -3,7 +3,7 @@ import json
import pytest
from zoe_scheduler.state import init as state_init, Base, AlchemySession
from zoe_scheduler.state.application import ApplicationState
from zoe_client.state.application import ApplicationState
from zoe_scheduler.application_description import ZoeApplication
from zoe_scheduler.configuration import init as conf_init, scheduler_conf
......
......@@ -9,11 +9,13 @@ http_listen_address = 192.168.45.25
http_listen_port = 4390
ipc_listen_address = 127.0.0.1
ipc_listen_port = 8723
object_storage_url = http://localhost:4390
[zoe_client]
db_connect = mysql+mysqlconnector://root@localhost/zoe
scheduler_ipc_address = localhost
scheduler_ipc_port = 8723
object_storage_url = http://localhost:4390
[zoe_web]
smtp_password = pass
......
......@@ -9,11 +9,13 @@ http_listen_address = 192.168.45.25
http_listen_port = 4390
ipc_listen_address = 127.0.0.1
ipc_listen_port = 8723
object_storage_url = http://localhost:4390
[zoe_client]
db_connect = mysql+mysqlconnector://root@localhost/zoe
scheduler_ipc_address = localhost
scheduler_ipc_port = 8723
object_storage_url = http://localhost:4390
[zoe_web]
smtp_password = pass
......
from zoe_scheduler.object_storage import *
fake_data = b"test" * 1024
......
......@@ -2,6 +2,7 @@
db_connect = mysql+mysqlconnector://root@localhost/zoe
scheduler_ipc_address = localhost
scheduler_ipc_port = 8723
object_storage_url = http://localhost:4390
[zoe_web]
smtp_password = Daicu2Ze
......
[zoe_storage]
storage_path = /var/lib/zoe
http_listen_address = 192.168.45.25
http_listen_port = 4390
#!/usr/bin/env python3
# This script is useful to run Zoe without going through the pip install process when developing
from zoe_storage_server.entrypoint import object_server
if __name__ == '__main__':
object_server()
import base64
import logging
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.state import AlchemySession
from zoe_client.ipc import ZoeIPCClient
from zoe_client.entities import Execution, Application, User
from zoe_client.entities import Execution
from zoe_client.state import session
from zoe_client.state.application import ApplicationState
from zoe_client.state.user import UserState
import zoe_client.storage_helper as storage
log = logging.getLogger(__name__)
......@@ -14,49 +15,79 @@ log = logging.getLogger(__name__)
class ZoeClient:
def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.state = AlchemySession()
# Applications
def application_binary_put(self, application_id: int, app_data: bytes) -> bool:
file_data = base64.b64encode(app_data)
answer = self.ipc_server.ask('application_binary_put', application_id=application_id, bin_data=file_data)
return answer is not None
def _check_application(self, application_id: int):
state = session()
app_count = state.query(ApplicationState).filter_by(id=application_id).count()
return app_count == 1
# Applications
def application_binary_get(self, application_id: int) -> bytes:
data = self.ipc_server.ask('application_binary_get', application_id=application_id)
app_data = base64.b64decode(data['zip_data'])
return app_data
if self._check_application(application_id):
return storage.download_application(application_id)
else:
return None
def application_list(self, user_id):
"""
Returns a list of all Applications belonging to user_id
:type user_id: int
:rtype : list[Application]
"""
answer = self.ipc_server.ask('application_list', user_id=user_id)
def application_binary_put(self, application_id: int, bin_data: bytes):
if self._check_application(application_id):
storage.upload_application(application_id, bin_data)
else:
log.error("Trying to upload application data for non-existent application")
def application_executions_get(self, application_id) -> list:
answer = self.ipc_server.ask("application_executions_get", application_id=application_id)
if answer is None:
return []
else:
return [Application(x) for x in answer['apps']]
return [Execution(e) for e in answer["executions"]]
def application_new(self, user_id: int, description: dict) -> int:
if not self.user_check(user_id):
def application_get(self, application_id):
state = session()
try:
application = state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
return None
answer = self.ipc_server.ask('application_new', user_id=user_id, description=description)
if answer is not None:
return answer['application_id']
else:
return application
def application_remove(self, application_id: int, force: bool) -> bool:
answer = self.ipc_server.ask('application_remove', application_id=application_id, force=force)
return answer is not None
def application_list(self, user_id: int) -> list:
"""
Returns a list of all applications belonging to user_id
:param user_id: the user
:returns a list of ApplicationState objects
"""
state = session()
return state.query(ApplicationState).filter_by(user_id=user_id).all()
def application_start(self, application_id: int) -> int:
answer = self.ipc_server.ask('application_start', application_id=application_id)
if answer is not None:
return answer["execution_id"]
else:
def application_new(self, user_id: int, description: dict) -> ApplicationState:
answer = self.ipc_server.ask('application_validate', description=description)
if answer is None:
log.error("Application description failed the scheduler validation")
return None
state = session()
application = ApplicationState(user_id=user_id, description=description)
state.add(application)
state.commit()
return application
def application_remove(self, application_id: int):
if not self._check_application(application_id):
log.error("Trying to remove a non-existent application")
return
answer = self.ipc_server.ask("application_executions_get", application_id=application_id)
if answer is None:
return
executions = [Execution(e) for e in answer["executions"]]
for e in executions:
self.execution_delete(e.id)
state = session()
application = state.query(ApplicationState).filter_by(id=application_id).one()
storage.delete_application(application_id)
state.delete(application)
state.commit()
def application_validate(self, description: dict) -> bool:
answer = self.ipc_server.ask('application_validate', description=description)
return answer is not None
......@@ -70,25 +101,42 @@ class ZoeClient:
ret = self.ipc_server.ask('execution_delete', execution_id=execution_id)
return ret is not None
def execution_kill(self, execution_id: int) -> None:
ret = self.ipc_server.ask('execution_kill', execution_id=execution_id)
return ret is not None
def execution_get(self, execution_id: int) -> Execution:
exec_dict = self.ipc_server.ask('execution_get', execution_id=execution_id)
if exec_dict is not None:
return Execution(exec_dict)
answer = self.ipc_server.ask('execution_get', execution_id=execution_id)
if answer is not None:
return Execution(answer["execution"])
def execution_terminate(self, execution_id: int) -> None:
ret = self.ipc_server.ask('execution_terminate', execution_id=execution_id)
return ret is not None
def execution_start(self, application_id: int) -> Execution:
state = session()
try:
application = state.query(ApplicationState).filter_by(id=application_id).one()
except NoResultFound:
log.error("No such application")
return None
answer = self.ipc_server.ask('application_validate', description=application.description)
if answer is None:
log.error("Application description failed the scheduler validation")
return None
answer = self.ipc_server.ask('execution_start', application_id=application_id, description=application.description)
if answer is not None:
return Execution(answer["execution"])
else:
return None
# Logs
def log_get(self, container_id: int) -> str:
clog = self.ipc_server.ask('log_get', container_id=container_id)
if clog is not None:
return clog['log']
answer = self.ipc_server.ask('log_get', container_id=container_id)
if answer is not None:
return answer['log']
def log_history_get(self, execution_id):
data = self.ipc_server.ask('log_history_get', execution_id=execution_id)
log_data = base64.b64decode(data['zip_data'])
return log_data
def log_history_get(self, execution_id: int) -> bytes:
return storage.download_log_archive(execution_id)
# Platform
def platform_stats(self) -> dict:
......@@ -97,26 +145,30 @@ class ZoeClient:
# Users
def user_check(self, user_id: int) -> bool:
num = self.state.query(UserState).filter_by(id=user_id).count()
state = session()
num = state.query(UserState).filter_by(id=user_id).count()
return num == 1
def user_new(self, email: str) -> User:
def user_new(self, email: str) -> UserState:
state = session()
user = UserState(email=email)
self.state.add(user)
self.state.commit()
return User(user.to_dict())
state.add(user)
state.commit()
return user
def user_get(self, user_id: int) -> User:
def user_get(self, user_id: int) -> UserState:
state = session()
try:
user = self.state.query(UserState).filter_by(id=user_id).one()
user = state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return None
return User(user.to_dict())
return user
def user_get_by_email(self, email: str) -> User:
def user_get_by_email(self, email: str) -> UserState:
state = session()
try:
user = self.state.query(UserState).filter_by(email=email).one()
user = state.query(UserState).filter_by(email=email).one()
except NoResultFound:
return None
else:
return User(user.to_dict())
return user
......@@ -11,7 +11,8 @@ defaults = {
'zoe_client': {
'db_connect': 'mysql+mysqlconnector://zoe:pass@dbhost/zoe',
'scheduler_ipc_address': 'localhost',
'scheduler_ipc_port': 8723
'scheduler_ipc_port': 8723,
'object_storage_url': 'http://localhost:4390'
},
'zoe_web': {
'smtp_server': 'smtp.exmaple.com',
......@@ -47,6 +48,10 @@ class ZoeClientConfig(ConfigParser):
def ipc_port(self) -> int:
return self.getint('zoe_client', 'scheduler_ipc_port')
@property
def object_storage_url(self) -> str:
return self.get('zoe_client', 'object_storage_url')
@property
def web_server_name(self) -> str:
return self.get('zoe_web', 'web_server_name')
......
......@@ -18,13 +18,12 @@ class Execution:
def __init__(self, execution: dict):
self.id = execution['id']
self.name = execution['name']
self.assigned_resources = execution['assigned_resources']
self.app_description = execution['app_description']
self.application_id = execution['application_id']
self.time_started = deserialize_datetime(execution['time_started'])
self.time_scheduled = deserialize_datetime(execution['time_scheduled'])
self.time_finished = deserialize_datetime(execution['time_finished'])
self.status = execution['status']
self.termination_notice = execution['termination_notice']
self.cluster_id = execution['cluster_id']
self.containers = []
......@@ -40,21 +39,3 @@ class Container:
self.cluster_id = container['cluster_id']
self.ip_address = container['ip_address']
self.readable_name = container['readable_name']
class Application:
"""
:type id: int
:type user_id: int
:type description: dict
:type executions: list[Execution]
"""
def __init__(self, application: dict):
self.id = application['id']
self.description = application['description'].copy()
self.user_id = application['user_id']
self.executions = []
for e in application['executions']:
self.executions.append(Execution(e))
......@@ -35,8 +35,8 @@ def user_get_cmd(args):
def app_new_cmd(args):
client = get_zoe_client()
app_descr = json.load(args.jsonfile)
application_id = client.application_new(args.user_id, app_descr)
print("Application added with ID: {}".format(application_id))
application = client.application_new(args.user_id, app_descr)
print("Application added with ID: {}".format(application.id))
def app_bin_put_cmd(args):
......@@ -44,14 +44,14 @@ def app_bin_put_cmd(args):
if not is_zipfile(args.zipfile):
print("Error: application binary must be a zip file")
return
args.zipfile.rewind()
args.zipfile.seek(0)
zipdata = args.zipfile.read()
client.application_binary_put(args.app_id, zipdata)
def app_start_cmd(args):
client = get_zoe_client()
ret = client.application_start(args.id)
ret = client.execution_start(args.id)
if ret:
print("Application scheduled successfully, use the app-inspect command to check its status")
else:
......@@ -60,7 +60,7 @@ def app_start_cmd(args):
def app_rm_cmd(args):
client = get_zoe_client()
client.application_remove(args.id, args.force)
client.application_remove(args.id)
def app_inspect_cmd(args):
......@@ -69,16 +69,21 @@ def app_inspect_cmd(args):
if application is None:
print("Error: application {} does not exist".format(args.id))
return
print(application)
print("Application name: {}".format(application.description["name"]))
executions = client.application_executions_get(application_id=args.id)
for e in executions:
print(" - Execution {} {}".format(e.name, e.status))
for c in e.containers:
print(" -- Container {}, ID {}".format(c.readable_name, c.id))
def app_list_cmd(args):
client = get_zoe_client()
applications = client.application_list(args.id)
if len(applications) > 0:
print("{:4} {:20} {:25}".format("ID", "Name", "Type"))
print("{:4} {:20}".format("ID", "Name"))
for app in applications:
print("{:4} {:20} {:25}".format(app.id, app.name, app.type))
print("{:4} {:20}".format(app.id, app.description['name']))
def exec_kill_cmd(args):
......@@ -87,7 +92,7 @@ def exec_kill_cmd(args):
if execution is None:
print("Error: execution {} does not exist".format(args.id))
return
client.execution_terminate(execution.id)
client.execution_kill(execution.id)
def log_get_cmd(args):
......@@ -112,7 +117,7 @@ def process_arguments() -> Namespace:
parser = ArgumentParser(description="Zoe - Container Analytics as a Service command-line client")
parser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output')
parser.add_argument('--ipc-server', help='Address of the Zoe scheduler process')
parser.add_argument('--ipc-port', type=int, help='Port of the Zoe scheduler process')
parser.add_argument('--ipc-port', help='Port of the Zoe scheduler process')
parser.add_argument('--setup-db', action='store_true', help='Sets up the configured database for use with the Zoe client')
subparser = parser.add_subparsers(title='subcommands', description='valid subcommands')
......@@ -181,9 +186,9 @@ def zoe():
logging.basicConfig(level=logging.INFO)
conf_init()
if hasattr(args, "ipc_server"):
if args.ipc_server is not None:
client_conf().set('zoe_client', 'scheduler_ipc_address', args.ipc_server)
if hasattr(args, "ipc_port"):
if args.ipc_port is not None:
client_conf().set('zoe_client', 'scheduler_ipc_port', args.ipc_port)
db_engine = state_init(client_conf().db_url)
......@@ -196,3 +201,4 @@ def zoe():
return
args.func(args)
sys.exit(0)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
......@@ -18,3 +18,7 @@ def init(db_url):
def create_tables(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def session():
return scoped_session(AlchemySession)
from sqlalchemy import Column, Integer, PickleType
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, PickleType, ForeignKey
from zoe_scheduler.state import Base
from zoe_client.state import Base
class ApplicationState(Base):
__tablename__ = 'applications'
id = Column(Integer, primary_key=True)
user_id = Column(Integer)
user_id = Column(Integer, ForeignKey('users.id'))
description = Column(PickleType())
executions = relationship("ExecutionState", order_by="ExecutionState.id", backref="application")
def executions_running(self):
ret = []
for e in self.executions:
if e.status == "running":
ret.append(e)
return ret
def to_dict(self) -> dict:
ret = {
'id': self.id,
'user_id': self.user_id,
'description': self.description.to_dict(),
'executions': [e.to_dict() for e in self.executions]
'description': self.description.copy(),
}
return ret
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from zoe_client.state import Base
......@@ -9,6 +10,8 @@ class UserState(Base):
id = Column(Integer, primary_key=True)
email = Column(String(128))
applications = relationship("ApplicationState", backref="user")
def to_dict(self):
return {
'id': self.id,
......
import logging
import requests
import requests.exceptions
from zoe_client.configuration import client_conf
log = logging.getLogger(__name__)
def generate_application_binary_url(application_id: int) -> str:
return client_conf().object_storage_url + '/apps/{}'.format(application_id)
def _upload(obj_id, kind, data: bytes):
url = client_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
files = {'file': data}
try:
requests.post(url, files=files)
except requests.exceptions.ConnectionError:
log.error("Cannot connect to {} to POST the binary file".format(url))
def _download(obj_id, kind) -> bytes:
url = client_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
try:
r = requests.get(url)
except requests.exceptions.ConnectionError:
log.error("Cannot connect to {} to GET the binary file".format(url))
return None
else:
return r.content
def _delete(obj_id, kind):
url = client_conf().object_storage_url + '/{}/{}'.format(kind, obj_id)
try:
requests.delete(url)
except requests.exceptions.ConnectionError:
log.error("Cannot connect to {} to DELETE the binary file".format(url))
def upload_application(app_id, app_data: bytes):
_upload(app_id, "apps", app_data)
def download_application(application_id) -> bytes:
return _download(application_id, "apps")
def download_log_archive(execution_id) -> bytes:
return _download(execution_id, "logs")
def delete_application(application_id):
_delete(application_id, "apps")
......@@ -14,11 +14,9 @@ defaults = {
'status_refresh_interval': 10,
'check_terminated_interval': 30,
'db_connect': 'mysql+mysqlconnector://zoe:pass@dbhost/zoe',
'storage_path': "/var/lib/zoe/history",
'http_listen_address': '127.0.0.1',
'http_listen_port': 4390,
'ipc_listen_address': '127.0.0.1',
'ipc_listen_port': 8723
'ipc_listen_port': 8723,
'object_storage_url': 'http://127.0.0.1:4390'