Commit 0b5b1343 authored by Daniele Venzano's avatar Daniele Venzano

Make the codebase testable

Remove module-level initializations so that test cases can be built more easily.
Add one test-test case for the object_storage module, just to make sure the boilerplate is right.
parent ce0e11b3
......@@ -5,8 +5,7 @@ install:
- pip install -r requirements.txt
- pip install pytest
before_script:
- cp .travis/zoe-travis.sample ./zoe.conf
- bash .travis/create_db.sh
- bash tests/resources/create_db.sh
script:
- py.test
- PYTHONPATH=. py.test --test-environment travis
- sphinx-build -nW -b html -d docs/_build/doctrees docs/ docs/_build/html
......@@ -45,6 +45,8 @@ defaults = {
}
}
_zoeconf = None
class ZoeConfig(ConfigParser):
def __init__(self):
......@@ -127,5 +129,16 @@ class ZoeConfig(ConfigParser):
def docker_private_registry(self) -> str:
return self.get('docker', 'private_registry')
zoeconf = ZoeConfig()
zoeconf.read(config_paths)
def init(config_file=None) -> ZoeConfig:
global _zoeconf
_zoeconf = ZoeConfig()
if config_file is None:
_zoeconf.read(config_paths)
else:
_zoeconf.read_file(open(config_file))
return _zoeconf
def zoeconf() -> ZoeConfig:
return _zoeconf
......@@ -10,4 +10,5 @@ API documentation
.. toctree::
:maxdepth: 2
developer/introduction
developer/client
General design decisions
========================
SQLAlchemy sessions
-------------------
To manage sessions, SQLAlchemy provides a lot of flexibility, with the ``sessionmaker()`` and the ``scoped_session()`` functions.
In Zoe we decided to have a number of "entry points", where a session can be created and closed. These entry points correspond to the thread loops:
* The scheduler main loop
* The tasks
* The IPC server (for now it not multi-threaded, but each request generates a new session and closes it at the end)
Sessions should never be created anywhere else, outside of these functions.
import pytest
from zoe_scheduler.state import Base, _engine
@pytest.fixture(scope='function')
def sqlalchemy(request):
Base.metadata.create_all(_engine)
\ No newline at end of file
import pytest
from common.application_resources import SparkApplicationResources
from zoe_scheduler.state import init as state_init, Base, AlchemySession
from zoe_scheduler.state.application import SparkSubmitApplicationState
from zoe_scheduler.state import UserState
from common.configuration import init as conf_init, zoeconf
def pytest_addoption(parser):
parser.addoption("--test-environment", default="local", help="Test environment: 'local' or 'travis'")
@pytest.fixture(scope='session')
def environment(request):
return request.config.getoption("--test-environment")
@pytest.fixture(scope='session')
def configuration(environment):
if environment == 'local':
conf_init('tests/resources/zoe-local.conf')
else:
conf_init('tests/resources/zoe-travis.conf')
@pytest.fixture(scope='session')
def state_connection(request, configuration):
engine = state_init(zoeconf().db_url)
connection = engine.connect()
trans = connection.begin()
Base.metadata.create_all(connection)
def fin():
trans.rollback()
connection.close()
engine.dispose()
request.addfinalizer(fin)
return connection
@pytest.fixture(scope='function')
def state_session(state_connection, request):
inner_trans = state_connection.begin_nested()
session = AlchemySession(bind=state_connection)
def fin():
session.close()
inner_trans.rollback()
request.addfinalizer(fin)
return session
@pytest.fixture(scope='function')
def application(state_session):
user = UserState()
user.email = 'a@b.c'
app = SparkSubmitApplicationState()
app.submit_image = "test"
app.worker_image = "test"
app.master_image = "test"
app.name = "testapp"
app.user = user
app.required_resources = SparkApplicationResources()
state_session.add(app)
state_session.flush()
return app
......@@ -10,7 +10,7 @@ proxy_update_accesses = 10
status_refresh = 10
[filesystem]
history_path = /var/lib/zoe/history
history_path = /tmp/history
[flask]
secret_key = b"\xc4\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!"
......
[docker]
swarm_manager_url = tcp://example.com:2380
[intervals]
check_health = 30
notebook_max_age_no_activity = 24
scheduler_task = 10
notebook_warning_age_no_activity = 2
proxy_update_accesses = 10
status_refresh = 10
[filesystem]
history_path = /tmp/history
[flask]
secret_key = b"\xc4\xb0\xa7\xff\x8fH'\xf7m\x1c\xa2\x92F\x1d\xdcz\x05\xe6CJN5\x83!"
[smtp]
password = none
user = none
server = none
[apache]
web_server_name = www.example.com
access_log = /var/log/apache2/access.log
proxy_config_file = /tmp/zoe-proxy.conf
proxy_path_prefix = /proxy
[db]
url = mysql+mysqlconnector://root@localhost/zoe
from zoe_scheduler.object_storage import *
fake_data = "test" * 1024
fake_data = b"test" * 1024
def test_application_data_upload():
def test_application_data_upload(application):
ret = init_history_paths()
assert ret == True
assert ret is True
application_data_upload(application, fake_data)
data = application_data_download(application)
assert data == fake_data
application_data_delete(application)
......@@ -3,13 +3,15 @@
from argparse import ArgumentParser, Namespace
import logging
from zoe_scheduler.state import create_tables
from zoe_scheduler.state import create_tables, init as state_init
from common.configuration import init as conf_init, zoeconf
argparser = None
db_engine = None
def setup_db_cmd(_):
create_tables()
create_tables(db_engine)
def process_arguments() -> Namespace:
......@@ -25,12 +27,17 @@ def process_arguments() -> Namespace:
def main():
global db_engine
args = process_arguments()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
conf_init()
db_engine = state_init(zoeconf().db_url)
try:
args.func(args)
except AttributeError:
......
......@@ -8,8 +8,9 @@ from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.ipc import ZoeIPCServer
from zoe_scheduler.object_storage import init_history_paths
from zoe_scheduler.state import init as state_init
from zoe_scheduler.proxy_manager import init as proxy_init
from common.configuration import zoeconf
from common.configuration import init as conf_init
log = logging.getLogger('zoe')
......@@ -31,7 +32,9 @@ def main():
logging.getLogger('requests').setLevel(logging.WARNING)
zoeconf = conf_init()
state_init(zoeconf.db_url)
proxy_init()
zoe_sched = ZoeScheduler()
......
......@@ -8,7 +8,7 @@ from tornado.ioloop import IOLoop
from zoe_web import app
from common.configuration import ipcconf
from common.configuration import ipcconf, init as conf_init
log = logging.getLogger("zoe_web")
......@@ -34,8 +34,11 @@ def main():
ipcconf['server'] = args.ipc_server
ipcconf['port'] = args.ipc_port
zoeconf = conf_init()
log.info("Starting HTTP server...")
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
app.secret_key = zoeconf.cookies_secret_key
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(5000, "0.0.0.0")
......
......@@ -6,7 +6,7 @@ from zipfile import is_zipfile
from pprint import pprint
from zoe_client import ZoeClient
from common.configuration import zoeconf
from common.configuration import init as conf_init, zoeconf
argparser = None
......@@ -122,7 +122,7 @@ def log_get_cmd(args):
def gen_config_cmd(args):
zoeconf.write(open(args.output_file, "w"))
zoeconf().write(open(args.output_file, "w"))
def container_stats_cmd(args):
......@@ -221,6 +221,8 @@ def main():
else:
logging.basicConfig(level=logging.INFO)
conf_init()
try:
args.func(args)
except AttributeError:
......
......@@ -7,16 +7,16 @@ from zoe_client.entities import User, Execution, Application
log = logging.getLogger(__name__)
REGISTRY = zoeconf.docker_private_registry
MASTER_IMAGE = REGISTRY + "/zoerepo/spark-master"
WORKER_IMAGE = REGISTRY + "/zoerepo/spark-worker"
SUBMIT_IMAGE = REGISTRY + "/zoerepo/spark-submit"
NOTEBOOK_IMAGE = REGISTRY + "/zoerepo/spark-notebook"
MASTER_IMAGE = "/zoerepo/spark-master"
WORKER_IMAGE = "/zoerepo/spark-worker"
SUBMIT_IMAGE = "/zoerepo/spark-submit"
NOTEBOOK_IMAGE = "/zoerepo/spark-notebook"
class ZoeClient:
def __init__(self, ipc_server='localhost', ipc_port=8723):
self.ipc_server = ZoeIPCClient(ipc_server, ipc_port)
self.image_registry = zoeconf().docker_private_registry
# Applications
def application_get(self, application_id: int) -> Application:
......@@ -52,8 +52,8 @@ class ZoeClient:
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE)
master_image=self.image_registry + MASTER_IMAGE,
worker_image=self.image_registry + WORKER_IMAGE)
if answer is not None:
return answer['app_id']
......@@ -64,9 +64,9 @@ class ZoeClient:
executor_memory=executor_memory,
executor_cores=executor_cores,
name=name,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
notebook_image=NOTEBOOK_IMAGE)
master_image=self.image_registry + MASTER_IMAGE,
worker_image=self.image_registry + WORKER_IMAGE,
notebook_image=self.image_registry + NOTEBOOK_IMAGE)
if answer is not None:
return answer['app_id']
......@@ -79,9 +79,9 @@ class ZoeClient:
executor_cores=executor_cores,
name=name,
file_data=file_data,
master_image=MASTER_IMAGE,
worker_image=WORKER_IMAGE,
submit_image=SUBMIT_IMAGE)
master_image=self.image_registry + MASTER_IMAGE,
worker_image=self.image_registry + WORKER_IMAGE,
submit_image=self.image_registry + SUBMIT_IMAGE)
if answer is not None:
return answer['app_id']
......
......@@ -70,8 +70,8 @@ def notify_notebook_notice(execution: ExecutionState):
subject = "[Zoe] Notebook termination warning"
template_vars = {
'grace_time': zoeconf.notebook_max_age_no_activity - zoeconf.notebook_warning_age_no_activity,
'wrn_age': zoeconf.notebook_warning_age_no_activity,
'grace_time': zoeconf().notebook_max_age_no_activity - zoeconf().notebook_warning_age_no_activity,
'wrn_age': zoeconf().notebook_warning_age_no_activity,
'nb_url': generate_notebook_url(execution)
}
send_email(email, subject, NOTEBOOK_WARNING_EMAIL_TEMPLATE, template_vars)
......@@ -82,7 +82,7 @@ def notify_notebook_termination(execution: ExecutionState):
email = app.user.email
subject = "[Zoe] Notebook terminated"
template_vars = {'max_age': zoeconf.notebook_max_age_no_activity}
template_vars = {'max_age': zoeconf().notebook_max_age_no_activity}
send_email(email, subject, NOTEBOOK_KILLED_EMAIL_TEMPLATE, template_vars)
......@@ -93,9 +93,9 @@ def send_email(address, subject, template, template_vars):
msg['Subject'] = subject
msg['From'] = 'noreply@bigfoot.eurecom.fr'
msg['To'] = address
s = smtplib.SMTP(zoeconf.smtp_server)
s = smtplib.SMTP(zoeconf().smtp_server)
s.ehlo()
s.starttls()
s.login(zoeconf.smtp_user, zoeconf.smtp_password)
s.login(zoeconf().smtp_user, zoeconf().smtp_password)
s.send_message(msg)
s.quit()
......@@ -230,12 +230,12 @@ class ZoeIPCServer:
if isinstance(execution.application, SparkNotebookApplicationState):
c = execution.find_container("spark-notebook")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
path = zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
path = zoeconf().proxy_path_url_prefix + '/{}'.format(pr.id)
return self._reply_ok(path=path)
elif isinstance(execution.application, SparkSubmitApplicationState):
c = execution.find_container("spark-submit")
pr = self.state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark application web interface").one()
path = zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
path = zoeconf().proxy_path_url_prefix + '/{}'.format(pr.id)
return self._reply_ok(path=path)
else:
return self._reply_error('unknown application type')
......
......@@ -9,30 +9,30 @@ log = logging.getLogger(__name__)
def init_history_paths() -> bool:
if not os.path.exists(zoeconf.history_path):
if not os.path.exists(zoeconf().history_path):
try:
os.makedirs(zoeconf.history_path)
os.makedirs(zoeconf().history_path)
except OSError:
log.error("Cannot create history directory in {}".format(zoeconf.history_path))
log.error("Cannot create history directory in {}".format(zoeconf().history_path))
return False
os.makedirs(os.path.join(zoeconf.history_path, 'apps'))
os.makedirs(os.path.join(zoeconf.history_path, 'logs'))
os.makedirs(os.path.join(zoeconf().history_path, 'apps'))
os.makedirs(os.path.join(zoeconf().history_path, 'logs'))
return True
def application_data_upload(application: ApplicationState, data: bytes) -> bool:
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
fpath = os.path.join(zoeconf().history_path, 'apps', 'app-{}.zip'.format(application.id))
open(fpath, "wb").write(data)
def application_data_download(application: ApplicationState) -> bytes:
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
fpath = os.path.join(zoeconf().history_path, 'apps', 'app-{}.zip'.format(application.id))
data = open(fpath, "rb").read()
return data
def application_data_delete(application: ApplicationState):
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
fpath = os.path.join(zoeconf().history_path, 'apps', 'app-{}.zip'.format(application.id))
try:
os.unlink(fpath)
except OSError:
......@@ -40,18 +40,18 @@ def application_data_delete(application: ApplicationState):
def logs_archive_upload(execution: ExecutionState, data: bytes) -> bool:
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
fpath = os.path.join(zoeconf().history_path, 'logs', 'log-{}.zip'.format(execution.id))
open(fpath, "wb").write(data)
def logs_archive_download(execution: ExecutionState) -> bytes:
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
fpath = os.path.join(zoeconf().history_path, 'logs', 'log-{}.zip'.format(execution.id))
data = open(fpath, "rb").read()
return data
def logs_archive_delete(execution: ExecutionState):
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
fpath = os.path.join(zoeconf().history_path, 'logs', 'log-{}.zip'.format(execution.id))
try:
os.unlink(fpath)
except OSError:
......
......@@ -4,7 +4,7 @@ from io import BytesIO
import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from zoe_scheduler.proxy_manager import proxy_manager
from zoe_scheduler.emails import notify_execution_finished, notify_notebook_notice, notify_notebook_termination
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.application import SparkApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState
......@@ -24,7 +24,6 @@ log = logging.getLogger(__name__)
class PlatformManager:
def __init__(self):
self.swarm = SwarmClient()
pm.update_proxy()
def start_execution(self, execution_id: int, resources: ApplicationResources) -> bool:
state = AlchemySession()
......@@ -36,7 +35,7 @@ class PlatformManager:
return False
execution.set_started()
state.commit()
pm.update_proxy()
proxy_manager().update_proxy()
return True
def _application_to_containers(self, state: AlchemySession, execution: ExecutionState):
......@@ -180,7 +179,7 @@ class PlatformManager:
state.delete(cluster)
execution.set_terminated()
self._archive_execution_logs(execution, logs)
pm.update_proxy()
proxy_manager().update_proxy()
def log_get(self, container: ContainerState) -> str:
return self.swarm.log_get(container.docker_id)
......@@ -211,11 +210,11 @@ class PlatformManager:
c = e.find_container("spark-notebook")
if c is not None:
pr = state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
if datetime.now() - pr.last_access > timedelta(hours=zoeconf.notebook_max_age_no_activity):
if datetime.now() - pr.last_access > timedelta(hours=zoeconf().notebook_max_age_no_activity):
log.info("Killing spark notebook {} for inactivity".format(e.id))
self.execution_terminate(state, e)
notify_notebook_termination(e)
if datetime.now() - pr.last_access > timedelta(hours=zoeconf.notebook_max_age_no_activity) - timedelta(hours=zoeconf.notebook_warning_age_no_activity):
if datetime.now() - pr.last_access > timedelta(hours=zoeconf().notebook_max_age_no_activity) - timedelta(hours=zoeconf().notebook_warning_age_no_activity):
if not e.termination_notice:
log.info("Spark notebook {} is on notice for inactivity".format(e.id))
e.termination_notice = True
......
......@@ -41,8 +41,8 @@ ENTRY_TEMPLATE = """
class ProxyManager:
def __init__(self):
self.apache_conf_filepath = zoeconf.apache_proxy_config_file
self.apache_access_log = zoeconf.apache_log_file
self.apache_conf_filepath = zoeconf().apache_proxy_config_file
self.apache_access_log = zoeconf().apache_log_file
def _get_proxy_entries(self):
state = AlchemySession()
......@@ -105,4 +105,14 @@ class ProxyManager:
state.commit()
state.close()
pm = ProxyManager()
_pm = None
def init():
global _pm
_pm = ProxyManager()
_pm.update_proxy()
def proxy_manager() -> ProxyManager:
return _pm
......@@ -5,7 +5,7 @@ import time
from zoe_scheduler.platform import PlatformManager
from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.proxy_manager import pm
from zoe_scheduler.proxy_manager import proxy_manager
from common.configuration import zoeconf
from zoe_scheduler.state.execution import ExecutionState
from common.application_resources import ApplicationResources
......@@ -76,9 +76,9 @@ class ZoeScheduler:
def init_tasks(self, tm: PeriodicTaskManager) -> Barrier:
barrier = Barrier(4) # number of tasks + main thread
tm.add_task("platform status updater", self.platform_status.update, zoeconf.interval_status_refresh, barrier)
tm.add_task("proxy access timestamp updater", pm.update_proxy_access_timestamps, zoeconf.interval_proxy_update_accesses, barrier)
tm.add_task("execution health checker", self.platform.check_executions_health, zoeconf.interval_check_health, barrier)
tm.add_task("platform status updater", self.platform_status.update, zoeconf().interval_status_refresh, barrier)
tm.add_task("proxy access timestamp updater", proxy_manager().update_proxy_access_timestamps, zoeconf().interval_proxy_update_accesses, barrier)
tm.add_task("execution health checker", self.platform.check_executions_health, zoeconf().interval_check_health, barrier)
return barrier
def incoming(self, execution: ExecutionState) -> bool:
......@@ -105,7 +105,7 @@ class ZoeScheduler:
"""
while True:
self.schedule()
time.sleep(zoeconf.interval_scheduler_task)
time.sleep(zoeconf().interval_scheduler_task)
def schedule(self):
self._check_runnable()
......
......@@ -5,16 +5,21 @@ from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
AlchemySession = sessionmaker()
_engine = None
from zoe_scheduler.state.application import ApplicationState, SparkSubmitApplicationState, SparkApplicationState, SparkNotebookApplicationState
from zoe_scheduler.state.cluster import ClusterState
from zoe_scheduler.state.container import ContainerState
from zoe_scheduler.state.execution import ExecutionState, SparkSubmitExecutionState
from zoe_scheduler.state.proxy import ProxyState
from zoe_scheduler.state.user import UserState
def init(db_url):
global _engine, AlchemySession
if _engine is None:
_engine = create_engine(db_url, echo=False)
AlchemySession.configure(bind=_engine)
global AlchemySession
engine = create_engine(db_url, echo=False)
AlchemySession.configure(bind=engine)
return engine
def create_tables():
Base.metadata.drop_all(_engine)
Base.metadata.create_all(_engine)
def create_tables(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
......@@ -13,7 +13,7 @@ log = logging.getLogger(__name__)
class SwarmClient:
def __init__(self):
manager = zoeconf.docker_swarm_manager
manager = zoeconf().docker_swarm_manager
self.cli = docker.Client(base_url=manager)
def info(self) -> SwarmStats:
......
......@@ -7,15 +7,15 @@ from common.configuration import zoeconf
def generate_log_history_url(execution: ExecutionState) -> str:
zoe_web_log_history_path = '/api/history/logs/'
return 'http://' + zoeconf.web_server_name + zoe_web_log_history_path + str(execution.id)
return 'http://' + zoeconf().web_server_name + zoe_web_log_history_path + str(execution.id)
def generate_notebook_url(execution: ExecutionState) -> str:
state = AlchemySession()
c = execution.find_container("spark-notebook")
pr = state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
return 'http://' + zoeconf.web_server_name + zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
return 'http://' + zoeconf().web_server_name + zoeconf().proxy_path_url_prefix + '/{}'.format(pr.id)
def generate_application_binary_url(application: ApplicationState) -> str:
return 'http://' + zoeconf.web_server_name + '/api/applications/download/{}'.format(application.id)
return 'http://' + zoeconf().web_server_name + '/api/applications/download/{}'.format(application.id)
......@@ -2,10 +2,8 @@ from flask import Flask
from zoe_web.api import api_bp
from zoe_web.web import web_bp
from common.configuration import zoeconf
app = Flask(__name__, static_url_path='/does-not-exist')
app.register_blueprint(web_bp, url_prefix='')
app.register_blueprint(api_bp, url_prefix='/api')
app.secret_key = zoeconf.cookies_secret_key
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment