Commit ce0e11b3 authored by Daniele Venzano's avatar Daniele Venzano

Make the sqlalchemy initialization explicit

It is now easier to write tests, also checked where sessions are created and have them correctly closed at the end.
parent 327f4a0d
......@@ -3,6 +3,7 @@ Flask
--allow-external mysql-connector-python
mysql-connector-python
python-dateutil
pytest
Sphinx
SQLAlchemy
tornado
......
__author__ = 'venzano'
import pytest
from zoe_scheduler.state import Base, _engine
@pytest.fixture(scope='function')
def sqlalchemy(request):
Base.metadata.create_all(_engine)
\ No newline at end of file
from zoe_scheduler.object_storage import *
fake_data = "test" * 1024
def test_application_data_upload():
ret = init_history_paths()
assert ret == True
......@@ -3,7 +3,7 @@
from argparse import ArgumentParser, Namespace
import logging
from common.state import create_tables
from zoe_scheduler.state import create_tables
argparser = None
......
......@@ -3,17 +3,15 @@
import argparse
import logging
from zoe_scheduler.scheduler import zoe_sched
from zoe_scheduler.scheduler import ZoeScheduler
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.ipc import ZoeIPCServer
from common.object_storage import init_history_paths
log = logging.getLogger('zoe')
from zoe_scheduler.object_storage import init_history_paths
from zoe_scheduler.state import init as state_init
from common.configuration import zoeconf
def sigint_handler():
log.warning('CTRL-C detected, terminating event loop...')
zoe_sched.stop_tasks()
log = logging.getLogger('zoe')
def process_arguments() -> argparse.Namespace:
......@@ -33,6 +31,10 @@ def main():
logging.getLogger('requests').setLevel(logging.WARNING)
state_init(zoeconf.db_url)
zoe_sched = ZoeScheduler()
ipc_server = ZoeIPCServer(zoe_sched, args.ipc_server_port)
if not init_history_paths():
......
......@@ -17,6 +17,9 @@ class ZoeIPCClient:
self.socket.send_json(message)
try:
answer = self.socket.recv_json()
except zmq.error.Again:
log.error("ZMQ is asking to try again, we drop the message")
return None
except zmq.ZMQError as e:
log.error("IPC server error: {}".format(e.msg))
return None
......
......@@ -4,7 +4,7 @@ import logging
from jinja2 import Template
from common.state.execution import SparkSubmitExecutionState, ExecutionState
from zoe_scheduler.state.execution import SparkSubmitExecutionState, ExecutionState
from zoe_scheduler.urls import generate_log_history_url, generate_notebook_url
from common.configuration import zoeconf
......
......@@ -6,16 +6,6 @@ class ZoeException(Exception):
return repr(self.value)
class UserIDDoesNotExist(ZoeException):
def __init__(self, user_id):
self.value = "The user ID {} does not exist".format(user_id)
class ApplicationStillRunning(ZoeException):
def __init__(self, application):
self.value = "The application {} cannot be removed because it is in use".format(application.id)
class CannotCreateCluster(ZoeException):
def __init__(self, application):
self.value = "Cannot create a cluster for application {}".format(application.id)
......@@ -8,15 +8,14 @@ from sqlalchemy.orm.exc import NoResultFound
import zmq
from common.application_resources import SparkApplicationResources
from common.state import AlchemySession
from common.state.application import ApplicationState, SparkSubmitApplicationState, SparkNotebookApplicationState, SparkApplicationState
from common.state.container import ContainerState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
from common.state.proxy import ProxyState
from common.state.user import UserState
import common.object_storage as storage
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.application import ApplicationState, SparkSubmitApplicationState, SparkNotebookApplicationState, SparkApplicationState
from zoe_scheduler.state.container import ContainerState
from zoe_scheduler.state.execution import ExecutionState, SparkSubmitExecutionState
from zoe_scheduler.state.proxy import ProxyState
from zoe_scheduler.state.user import UserState
import zoe_scheduler.object_storage as storage
from common.configuration import zoeconf
from zoe_scheduler.scheduler import ZoeScheduler
log = logging.getLogger(__name__)
......
import os
import logging
from common.state import ApplicationState, ExecutionState
from zoe_scheduler.state.application import ApplicationState
from zoe_scheduler.state.execution import ExecutionState
from common.configuration import zoeconf
log = logging.getLogger(__name__)
......
......@@ -6,11 +6,16 @@ import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from zoe_scheduler.emails import notify_execution_finished, notify_notebook_notice, notify_notebook_termination
from common.state import AlchemySession, ClusterState, ContainerState, SparkApplicationState, ProxyState, ExecutionState, SparkNotebookApplicationState, SparkSubmitApplicationState, SparkSubmitExecutionState
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.application import SparkApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState
from zoe_scheduler.state.cluster import ClusterState
from zoe_scheduler.state.container import ContainerState
from zoe_scheduler.state.execution import ExecutionState, SparkSubmitExecutionState
from zoe_scheduler.state.proxy import ProxyState
from common.application_resources import ApplicationResources
from common.exceptions import CannotCreateCluster
from zoe_scheduler.exceptions import CannotCreateCluster
from common.configuration import zoeconf
from common.object_storage import logs_archive_upload
from zoe_scheduler.object_storage import logs_archive_upload
from zoe_scheduler.urls import generate_application_binary_url
log = logging.getLogger(__name__)
......@@ -217,6 +222,7 @@ class PlatformManager:
notify_notebook_notice(e)
state.commit()
state.close()
def _container_died(self, state: AlchemySession, container: ContainerState):
if container.readable_name == "spark-submit" or container.readable_name == "spark-master":
......
......@@ -7,7 +7,8 @@ import logging
from jinja2 import Template
from common.configuration import zoeconf
from common.state import AlchemySession, ProxyState
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.proxy import ProxyState
log = logging.getLogger(__name__)
......
......@@ -7,7 +7,7 @@ from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.proxy_manager import pm
from common.configuration import zoeconf
from common.state import ExecutionState
from zoe_scheduler.state.execution import ExecutionState
from common.application_resources import ApplicationResources
from zoe_scheduler.stats import SchedulerStats
......@@ -97,7 +97,12 @@ class ZoeScheduler:
else: # Some error happened
log.error('Execution ID {} cannot be started'.format(execution_id))
def loop(self): # FIXME the scheduler should wait on events, not sleep
def loop(self): # FIXME the scheduler should wait on events, not sleep
"""
This method is the scheduling task. It is the loop the main thread runs, started from the zoe-scheduler executable.
It does not use an sqlalchemy session.
:return: None
"""
while True:
self.schedule()
time.sleep(zoeconf.interval_scheduler_task)
......@@ -108,6 +113,3 @@ class ZoeScheduler:
def execution_terminate(self, state, execution: ExecutionState):
self.platform.execution_terminate(state, execution)
self.scheduler_policy.terminated(execution.id)
zoe_sched = ZoeScheduler()
......@@ -2,19 +2,17 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from common.configuration import zoeconf
Base = declarative_base()
AlchemySession = sessionmaker()
_engine = None
_engine = create_engine(zoeconf.db_url, echo=False)
AlchemySession = sessionmaker(bind=_engine)
from common.state.container import ContainerState
from common.state.cluster import ClusterState
from common.state.application import ApplicationState, SparkApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState
from common.state.user import UserState
from common.state.proxy import ProxyState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
def init(db_url):
global _engine, AlchemySession
if _engine is None:
_engine = create_engine(db_url, echo=False)
AlchemySession.configure(bind=_engine)
def create_tables():
......
from sqlalchemy import Column, Integer, String, PickleType, ForeignKey
from sqlalchemy.orm import relationship
from common.state import Base
from zoe_scheduler.state import Base
class ApplicationState(Base):
......
from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy.orm import relationship
from common.state import Base
from zoe_scheduler.state import Base
class ClusterState(Base):
......
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from common.state import Base
from zoe_scheduler.state import Base
class ContainerState(Base):
......
......@@ -3,7 +3,7 @@ from datetime import datetime
from sqlalchemy import Column, Integer, String, PickleType, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from common.state import Base
from zoe_scheduler.state import Base
class ExecutionState(Base):
......
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, func
from common.state import Base
from zoe_scheduler.state import Base
class ProxyState(Base):
......
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from common.state import Base
from zoe_scheduler.state import Base
class UserState(Base):
......@@ -12,9 +12,6 @@ class UserState(Base):
applications = relationship("ApplicationState", order_by="ApplicationState.id", backref="user")
def extract(self):
return User(self)
def to_dict(self):
return {
'id': self.id,
......
import time
from common.state import ApplicationState, ExecutionState, ContainerState, ProxyState
class Stats:
def __init__(self):
self.timestamp = None
......
from common.state.execution import ExecutionState
from common.state import ProxyState, AlchemySession, ApplicationState
from zoe_scheduler.state import AlchemySession
from zoe_scheduler.state.application import ApplicationState
from zoe_scheduler.state.execution import ExecutionState
from zoe_scheduler.state.proxy import ProxyState
from common.configuration import zoeconf
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment