Commit ba2bb2ba authored by Daniele Venzano's avatar Daniele Venzano

New RPC system based on zeroMQ

Add new server and client classes
Convert user ops and platform statistics to use new RPC
parent 071a3949
......@@ -15,8 +15,8 @@ class UserState(Base):
def extract(self):
return User(self)
class User:
def __init__(self, user: UserState):
self.id = user.id
self.email = user.email
def to_dict(self):
return {
'id': self.id,
'email': self.email
}
"""A setuptools based setup module.
See:
https://packaging.python.org/en/latest/distributing.html
https://github.com/pypa/sampleproject
"""
# Always prefer setuptools over distutils
from setuptools import setup, find_packages
# To use a consistent encoding
from codecs import open
from os import path
import sys
print("For now this does not work")
sys.exit(1)
here = path.abspath(path.dirname(__file__))
# Get the long description from the relevant file
with open(path.join(here, 'DESCRIPTION.rst'), encoding='utf-8') as f:
long_description = f.read()
setup(
name='zoe',
# Versions should comply with PEP440. For a discussion on single-sourcing
# the version across setup.py and the project code, see
# https://packaging.python.org/en/latest/single_source_version.html
version='0.8.0',
description='Zoe - Analytics on demand',
long_description=long_description,
# The project's main homepage.
url='https://github.com/DistributedSystemsGroup/zoe',
# Author details
author='Daniele Venzano',
author_email='venza@brownhat.org',
# Choose your license
license='Apache 2.0',
# See https://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
# How mature is this project? Common values are
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
'Development Status :: 3 - Alpha',
'Environment :: Web Environment',
'Framework :: IPython',
# Indicate who your project is intended for
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'Topic :: Education',
'Operating System :: POSIX :: Linux',
'Topic :: Software Development',
'Topic :: System :: Distributed Computing',
# Pick your license as you wish (should match "license" above)
'License :: OSI Approved :: Apache Software License',
# Specify the Python versions you support here. In particular, ensure
# that you indicate whether you support Python 2, Python 3 or both.
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.4',
],
# What does your project relate to?
keywords='spark analytics docker swarm containers notebook',
# You can just specify the packages manually here if your project is
# simple. Or you can use find_packages().
packages=find_packages(exclude=['scripts']),
# List run-time dependencies here. These will be installed by pip when
# your project is installed. For an analysis of "install_requires" vs pip's
# requirements files see:
# https://packaging.python.org/en/latest/requirements.html
install_requires=['peppercorn'],
# List additional groups of dependencies here (e.g. development
# dependencies). You can install these using the following syntax,
# for example:
# $ pip install -e .[dev,test]
extras_require={
'dev': ['check-manifest'],
'test': ['coverage'],
},
# If there are data files included in your packages that need to be
# installed, specify them here. If using Python 2.6 or less, then these
# have to be included in MANIFEST.in as well.
package_data={
'sample': ['package_data.dat'],
},
# Although 'package_data' is the preferred approach, in some case you may
# need to place data files outside of your packages. See:
# http://docs.python.org/3.4/distutils/setupscript.html#installing-additional-files # noqa
# In this case, 'data_file' will be installed into '<sys.prefix>/my_data'
data_files=[('my_data', ['data/data_file'])],
# To provide executable scripts, use entry points in preference to the
# "scripts" keyword. Entry points provide cross-platform support and allow
# pip to create the appropriate form of executable for the target platform.
entry_points={
'console_scripts': [
'sample=sample:main',
],
},
)
......@@ -8,6 +8,7 @@ from rpyc.utils.server import ThreadedServer
from zoe_scheduler.rpyc_service import ZoeSchedulerRPCService
from zoe_scheduler.scheduler import zoe_sched
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.ipc import ZoeIPCServer
from common.object_storage import init_history_paths
log = logging.getLogger('zoe')
......@@ -30,6 +31,7 @@ def process_arguments() -> argparse.Namespace:
argparser = argparse.ArgumentParser(description="Zoe Scheduler - Container Analytics as a Service scheduling component")
argparser.add_argument('-d', '--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--rpyc-no-auto-register', action='store_true', help='Do not register automatically in the RPyC registry')
argparser.add_argument('--ipc-server-port', type=int, default=8723, help='Port the IPC server should bind to')
return argparser.parse_args()
......@@ -46,6 +48,8 @@ def main():
rpyc_logger = logging.getLogger('rpyc')
rpyc_logger.setLevel(logging.WARNING)
ipc_server = ZoeIPCServer(zoe_sched, args.ipc_server_port)
if not init_history_paths():
return
......@@ -58,6 +62,8 @@ def main():
zoe_sched.init_tasks(tm)
ipc_server.start_loop()
rpyc_server.start()
tm.stop_all()
......
import logging
import rpyc
from sqlalchemy.orm.exc import NoResultFound
from zoe_client.ipc import ZoeIPCClient
from common.state import AlchemySession
from common.state.application import ApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkApplicationState, Application
from common.state.container import ContainerState
from common.state.execution import ExecutionState, SparkSubmitExecutionState, Execution
from common.state.proxy import ProxyState
from common.state.user import UserState, User
from common.state.user import UserState
from common.application_resources import SparkApplicationResources
from common.stats import PlatformStats
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
import common.object_storage as storage
from common.configuration import zoeconf, rpycconf
from zoe_client.entities import User
log = logging.getLogger(__name__)
REGISTRY = zoeconf.docker_private_registry
MASTER_IMAGE = REGISTRY + "/zoerepo/spark-master"
......@@ -22,6 +27,7 @@ NOTEBOOK_IMAGE = REGISTRY + "/zoerepo/spark-notebook"
class ZoeClient:
def __init__(self, rpyc_server=None, rpyc_port=4000):
self.ipc_server = ZoeIPCClient("localhost")
self.rpyc_server = rpyc_server
self.rpyc_port = rpyc_port
self.state = AlchemySession()
......@@ -234,36 +240,29 @@ class ZoeClient:
return storage.logs_archive_download(execution)
# Platform
def platform_stats(self) -> PlatformStats:
ret = self.server.platform_stats()
return ret
def platform_stats(self) -> dict:
stats = self.ipc_server.ask('platform_stats')
return stats
# Users
def user_check(self, user_id: int) -> bool:
user = self.state.query(UserState).filter_by(id=user_id).count()
return user == 1
user = self.user_get(user_id)
return user is not None
def user_new(self, email: str) -> User:
user = UserState(email=email)
self.state.add(user)
self.state.commit()
return user.extract()
user_dict = self.ipc_server.ask('user_new', email=email)
if user_dict is not None:
return User(user_dict)
def user_get(self, user_id: int) -> User:
try:
user = self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return None
else:
return user.extract()
user_dict = self.ipc_server.ask('user_get', user_id=user_id)
if user_dict is not None:
return User(user_dict)
def user_get_by_email(self, email: str) -> User:
try:
user = self.state.query(UserState).filter_by(email=email).one()
except NoResultFound:
return None
else:
return user.extract()
user_dict = self.ipc_server.ask('user_get_by_email', user_email=email)
if user_dict is not None:
return User(user_dict)
def get_zoe_client() -> ZoeClient:
......
class User:
def __init__(self, user: dict):
self.id = user['id']
self.email = user['email']
import logging
import zmq
log = logging.getLogger(__name__)
class ZoeIPCClient:
def __init__(self, server, port=8723):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://%s:%d" % (server, port))
log.debug("ZMQ socket connected")
def _ask(self, message: dict) -> dict:
self.socket.send_json(message)
answer = self.socket.recv_json()
if self._is_error(answer):
log.info("IPC error: {}".format(self._error(answer)))
return None
else:
return self._answer(answer)
def ask(self, command: str, **kwargs) -> dict:
q = {
'command': command,
'args': kwargs
}
return self._ask(q)
def _is_error(self, message: dict) -> bool:
return message['status'] == 'error'
def _error(self, message: dict) -> str:
return message['answer']
def _answer(self, message: dict) -> dict:
return message['answer']
......@@ -5,7 +5,7 @@ import logging
from jinja2 import Template
from common.state.execution import SparkSubmitExecutionState, ExecutionState
from common.urls import generate_log_history_url, generate_notebook_url
from zoe_scheduler.urls import generate_log_history_url, generate_notebook_url
from common.configuration import zoeconf
log = logging.getLogger(__name__)
......
import logging
import threading
from sqlalchemy.orm.exc import NoResultFound
import zmq
from common.state import AlchemySession
from common.state.user import UserState
from zoe_scheduler.scheduler import ZoeScheduler
log = logging.getLogger(__name__)
class ZoeIPCServer:
def __init__(self, scheduler: ZoeScheduler, port=8723):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://*:%s" % port)
self.th = None
self.state = AlchemySession()
self.sched = scheduler
def start_loop(self):
self.th = threading.Thread(target=self._loop, name="IPC server", daemon=True)
self.th.start()
def _loop(self):
log.debug("IPC server thread started")
while True:
message = self.socket.recv_json()
reply = self._dispatch(message)
self.socket.send_json(reply)
def _dispatch(self, message: dict) -> dict:
if "command" not in message or "args" not in message:
log.error("Ignoring malformed message: {}".format(message))
return self._reply_error('malformed')
if not isinstance(message['args'], dict):
log.error("Ignoring malformed message: {}".format(message))
return self._reply_error('malformed')
try:
func = getattr(self, message["command"])
except AttributeError:
log.error("Ignoring unkown command: {}".format(message["command"]))
return self._reply_error('unknown command')
return func(**message["args"])
def _reply_ok(self, reply: dict) -> dict:
return {'status': 'ok', 'answer': reply}
def _reply_error(self, error_msg: str) -> dict:
return {'status': 'error', 'answer': error_msg}
# Platform
def platform_stats(self):
ret = self.sched.platform_status.stats()
return self._reply_ok(ret.to_dict())
# Users
def user_get(self, user_id) -> dict:
try:
user = self.state.query(UserState).filter_by(id=user_id).one()
except NoResultFound:
return self._reply_error('no such user')
else:
return self._reply_ok(user.to_dict())
def user_get_by_email(self, user_email) -> dict:
try:
user = self.state.query(UserState).filter_by(email=user_email).one()
except NoResultFound:
return self._reply_error('no such user')
else:
return self._reply_ok(user.to_dict())
def user_new(self, email: str) -> dict:
user = UserState(email=email)
self.state.add(user)
self.state.commit()
return self._reply_ok(user.to_dict())
from datetime import datetime, timedelta
import logging
from io import BytesIO
import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from zoe_scheduler.emails import notify_execution_finished, notify_notebook_notice, notify_notebook_termination
from common.state import AlchemySession, ClusterState, ContainerState, SparkApplicationState, ProxyState, ExecutionState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkSubmitExecutionState
from common.application_resources import ApplicationResources
from common.exceptions import CannotCreateCluster
from common.configuration import zoeconf
from common.object_storage import logs_archive_upload
from common.urls import generate_application_binary_url
from zoe_scheduler.urls import generate_application_binary_url
log = logging.getLogger(__name__)
......
import logging
from common.stats import PlatformStats
from zoe_scheduler.stats import PlatformStats
from zoe_scheduler.swarm_client import SwarmClient
log = logging.getLogger(__name__)
......
......@@ -2,8 +2,7 @@ import rpyc
from sqlalchemy.orm.exc import NoResultFound
from zoe_scheduler.scheduler import zoe_sched
from common.stats import PlatformStats, ContainerStats
from zoe_scheduler.stats import PlatformStats, ContainerStats
from common.state import AlchemySession, ContainerState
from common.state.execution import ExecutionState
......
......@@ -5,11 +5,10 @@ from zoe_scheduler.platform import PlatformManager
from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.proxy_manager import pm
from common.configuration import zoeconf
from common.state import ExecutionState
from common.application_resources import ApplicationResources
from common.stats import SchedulerStats
from zoe_scheduler.stats import SchedulerStats
log = logging.getLogger(__name__)
......
......@@ -7,6 +7,11 @@ class Stats:
def __init__(self):
self.timestamp = None
def to_dict(self) -> dict:
ret = {}
ret.update(vars(self))
return ret
class SwarmNodeStats(Stats):
def __init__(self, name):
......@@ -20,17 +25,6 @@ class SwarmNodeStats(Stats):
self.memory_reserved = 0
self.labels = {}
def __str__(self):
s = " -- Node {}\n".format(self.name)
s += " -- Docker endpoint: {}\n".format(self.docker_endpoint)
s += " -- Container count: {}\n".format(self.container_count)
s += " -- Memory total: {}\n".format(self.memory_total)
s += " -- Memory reserved: {}\n".format(self.memory_reserved)
s += " -- Cores total: {}\n".format(self.cores_total)
s += " -- Cores reserved: {}\n".format(self.cores_reserved)
s += " -- Labels: {}\n".format(self.labels)
return s
class SwarmStats(Stats):
def __init__(self):
......@@ -43,16 +37,20 @@ class SwarmStats(Stats):
self.active_filters = []
self.nodes = []
def __str__(self):
s = " - Container count: {}\n".format(self.container_count)
s += " - Image count: {}\n".format(self.image_count)
s += " - Memory total: {}\n".format(self.memory_total)
s += " - Cores total: {}\n".format(self.cores_total)
s += " - Placement strategy: {}\n".format(self.placement_strategy)
s += " - Active filters: {}\n".format(self.active_filters)
def to_dict(self) -> dict:
ret = {
'container_count': self.container_count,
'image_count': self.image_count,
'memory_total': self.memory_total,
'cores_total': self.cores_total,
'placement_strategy': self.placement_strategy,
'active_filters': self.active_filters,
'nodes': []
}
for node in self.nodes:
s += str(node)
return s
ret['nodes'].append(node.to_dict())
return ret
class SchedulerStats(Stats):
......@@ -61,9 +59,6 @@ class SchedulerStats(Stats):
self.count_running = 0
self.count_waiting = 0
def __str__(self):
return " - Apps running: {}\n - Apps waiting: {}\n".format(self.count_running, self.count_waiting)
class PlatformStats(Stats):
def __init__(self):
......@@ -71,8 +66,11 @@ class PlatformStats(Stats):
self.swarm = SwarmStats()
self.scheduler = SchedulerStats()
def __str__(self):
return "Swarm:\n{}\nScheduler:\n{}\n".format(self.swarm, self.scheduler)
def to_dict(self) -> dict:
return {
'swarm': self.swarm.to_dict(),
'scheduler': self.scheduler.to_dict()
}
class ContainerStats(Stats):
......@@ -97,11 +95,6 @@ class ContainerStats(Stats):
self.net_bytes_rx = docker_stats['network']['rx_bytes']
self.net_bytes_tx = docker_stats['network']['tx_bytes']
def to_dict(self) -> dict:
ret = {}
ret.update(vars(self))
return ret
documentation_sample = {
'blkio_stats': {
......
......@@ -6,7 +6,7 @@ import docker.utils
import docker.errors
from common.configuration import zoeconf
from common.stats import SwarmStats, SwarmNodeStats, ContainerStats
from zoe_scheduler.stats import SwarmStats, SwarmNodeStats, ContainerStats
log = logging.getLogger(__name__)
......
......@@ -175,7 +175,7 @@ sorttable = {
// check for a date: dd/mm/yyyy or dd/mm/yy
// can have / or . or - as separator
// can be mm/dd as well
possdate = text.match(sorttable.DATE_RE)
possdate = text.match(sorttable.DATE_RE);
if (possdate) {
// looks like a date
first = parseInt(possdate[1]);
......@@ -331,7 +331,7 @@ sorttable = {
} // while(swap)
}
}
};
/* ******************************************************************
Supporting functions: bundled here to avoid depending on a library
......@@ -395,7 +395,7 @@ function dean_addEvent(element, type, handler) {
// assign a global event handler to do all the work
element["on" + type] = handleEvent;
}
};
}
// a counter used to create unique IDs
dean_addEvent.guid = 1;
......@@ -408,8 +408,7 @@ function removeEvent(element, type, handler) {
delete element.events[type][handler.$$guid];
}
}
};
}
function handleEvent(event) {
var returnValue = true;
// grab the event object (IE uses a global event object)
......@@ -424,20 +423,19 @@ function handleEvent(event) {
}
}
return returnValue;
};
}
function fixEvent(event) {
// add W3C standard event methods
event.preventDefault = fixEvent.preventDefault;
event.stopPropagation = fixEvent.stopPropagation;
return event;
};
}
fixEvent.preventDefault = function() {
this.returnValue = false;
};
fixEvent.stopPropagation = function() {
this.cancelBubble = true;
}
};
// Dean's forEach: http://dean.edwards.name/base/forEach.js
/*
......
......@@ -32,10 +32,6 @@ table.app_list td {
padding-right: 1.5em;
}
td.long-text {
font-size: xx-small;
}
div.status_line {
float: left;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment