Commit 2e5d80aa authored by Daniele Venzano's avatar Daniele Venzano

Rationalize state classes

Refactor the state classes to differentiate between State classes, coming from SqlAlchemy, "normal" classes that are moved areoung and passed to client applications and statistic classes that are generated when required, on the fly.
parent e165b498
import os
import logging
from common.state import Application, Execution
from common.state import ApplicationState, ExecutionState
from common.configuration import zoeconf
log = logging.getLogger(__name__)
......@@ -19,18 +19,18 @@ def init_history_paths() -> bool:
return True
def application_data_upload(application: Application, data: bytes) -> bool:
def application_data_upload(application: ApplicationState, data: bytes) -> bool:
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
open(fpath, "wb").write(data)
def application_data_download(application: Application) -> bytes:
def application_data_download(application: ApplicationState) -> bytes:
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
data = open(fpath, "rb").read()
return data
def application_data_delete(application: Application):
def application_data_delete(application: ApplicationState):
fpath = os.path.join(zoeconf.history_path, 'apps', 'app-{}.zip'.format(application.id))
try:
os.unlink(fpath)
......@@ -38,18 +38,18 @@ def application_data_delete(application: Application):
log.warning("Binary data for application {} not found, cannot delete".format(application.id))
def logs_archive_upload(execution: Execution, data: bytes) -> bool:
def logs_archive_upload(execution: ExecutionState, data: bytes) -> bool:
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
open(fpath, "wb").write(data)
def logs_archive_download(execution: Execution) -> bytes:
def logs_archive_download(execution: ExecutionState) -> bytes:
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
data = open(fpath, "rb").read()
return data
def logs_archive_delete(execution: Execution):
def logs_archive_delete(execution: ExecutionState):
fpath = os.path.join(zoeconf.history_path, 'logs', 'log-{}.zip'.format(execution.id))
try:
os.unlink(fpath)
......
......@@ -9,12 +9,12 @@ Base = declarative_base()
_engine = create_engine(zoeconf.db_url, echo=False)
AlchemySession = sessionmaker(bind=_engine)
from common.state.container import Container
from common.state.cluster import Cluster
from common.state.application import Application, SparkApplication, SparkNotebookApplication, SparkSubmitApplication
from common.state.user import User
from common.state.proxy import Proxy
from common.state.execution import Execution, SparkSubmitExecution
from common.state.container import ContainerState
from common.state.cluster import ClusterState
from common.state.application import ApplicationState, SparkApplicationState, SparkNotebookApplicationState, SparkSubmitApplicationState
from common.state.user import UserState
from common.state.proxy import ProxyState
from common.state.execution import ExecutionState, SparkSubmitExecutionState
def create_tables():
......
......@@ -4,7 +4,7 @@ from sqlalchemy.orm import relationship
from common.state import Base
class Application(Base):
class ApplicationState(Base):
__tablename__ = 'applications'
id = Column(Integer, primary_key=True)
......@@ -12,7 +12,7 @@ class Application(Base):
required_resources = Column(PickleType()) # JSON resource description
user_id = Column(Integer, ForeignKey('users.id'))
executions = relationship("Execution", order_by="Execution.id", backref="application")
executions = relationship("ExecutionState", order_by="ExecutionState.id", backref="application")
type = Column(String(20)) # Needed by sqlalchemy to manage class inheritance
......@@ -21,19 +21,6 @@ class Application(Base):
'polymorphic_identity': 'application'
}
def to_dict(self) -> dict:
ret = {
'id': self.id,
'name': self.name,
'required_resources': self.required_resources.__dict__.copy(),
'user_id': self.user_id
}
return ret
def __repr__(self):
return "<Application(name='%s', id='%s', required_resourced='%s')>" % (
self.name, self.id, self.required_resources)
def executions_running(self):
ret = []
for e in self.executions:
......@@ -42,17 +29,10 @@ class Application(Base):
return ret
def extract(self):
ret = PlainApplication()
ret.id = self.id
ret.name = self.name
ret.required_resources = self.required_resources
ret.user_id = self.user_id
ret.executions = [x.id for x in self.executions]
ret.type = self.type
return ret
return Application(self)
class SparkApplication(Application):
class SparkApplicationState(ApplicationState):
master_image = Column(String(256))
worker_image = Column(String(256))
......@@ -60,63 +40,67 @@ class SparkApplication(Application):
'polymorphic_identity': 'spark-application'
}
def to_dict(self) -> dict:
ret = super().to_dict()
ret["master_image"] = self.master_image
ret["worker_image"] = self.worker_image
return ret
def extract(self):
ret = super().extract()
ret.master_image = self.master_image
ret.worker_image = self.worker_image
return ret
return Application(self)
class SparkNotebookApplication(SparkApplication):
class SparkNotebookApplicationState(SparkApplicationState):
notebook_image = Column(String(256))
__mapper_args__ = {
'polymorphic_identity': 'spark-notebook'
}
def to_dict(self) -> dict:
ret = super().to_dict()
ret["notebook_image"] = self.notebook_image
return ret
def extract(self):
ret = super().extract()
ret.notebook_image = self.notebook_image
return ret
return Application(self)
class SparkSubmitApplication(SparkApplication):
class SparkSubmitApplicationState(SparkApplicationState):
submit_image = Column(String(256))
__mapper_args__ = {
'polymorphic_identity': 'spark-submit'
}
def to_dict(self) -> dict:
ret = super().to_dict()
ret["submit_image"] = self.submit_image
return ret
def extract(self):
ret = super().extract()
ret.submit_image = self.submit_image
return ret
class PlainApplication:
id = None
name = None
required_resources = None
user_id = None
executions = None
type = None
master_image = None
worker_image = None
notebook_image = None
submit_image = None
return Application(self)
class Application:
"""
:type id: int
:type name: str
:type required_resources: ApplicationResources
:type user_id: int
:type type: str
:type master_image: str
:type worker_image: str
:type notebook_image: str
:type submit_image: str
:type executions: list[Execution]
"""
def __init__(self, application: ApplicationState) -> None:
self.id = application.id
self.name = application.name
self.required_resources = application.required_resources
self.user_id = application.user_id
self.type = application.type
if isinstance(application, SparkApplicationState):
self.master_image = application.master_image
self.worker_image = application.worker_image
if isinstance(application, SparkNotebookApplicationState):
self.notebook_image = application.notebook_image
if isinstance(application, SparkSubmitApplicationState):
self.submit_image = application.submit_image
self.executions = []
for e in application.executions:
self.executions.append(e.extract())
def __str__(self):
s = "Application"
s += " - Name: {}".format(self.name)
s += " - Type: {}".format(self.type)
# FIXME add missing fields
return s
......@@ -4,15 +4,11 @@ from sqlalchemy.orm import relationship
from common.state import Base
class Cluster(Base):
class ClusterState(Base):
__tablename__ = 'clusters'
id = Column(Integer, primary_key=True)
execution_id = Column(Integer, ForeignKey('executions.id'))
containers = relationship("Container", order_by="Container.id", backref="cluster")
proxies = relationship("Proxy", order_by="Proxy.id", backref="cluster")
def __repr__(self):
return "<Cluster(id='%s', execution_id='%s')>" % (
self.id, self.execution_id)
containers = relationship("ContainerState", order_by="ContainerState.id", backref="cluster")
proxies = relationship("ProxyState", order_by="ProxyState.id", backref="cluster")
......@@ -4,7 +4,7 @@ from sqlalchemy.orm import relationship
from common.state import Base
class Container(Base):
class ContainerState(Base):
__tablename__ = 'containers'
id = Column(Integer, primary_key=True)
......@@ -13,8 +13,25 @@ class Container(Base):
ip_address = Column(String(16))
readable_name = Column(String(32))
proxies = relationship("Proxy", order_by="Proxy.id", backref="container")
proxies = relationship("ProxyState", order_by="ProxyState.id", backref="container")
def __repr__(self):
return "<Container(name='%s', id='%s', docker_id='%s', cluster_id='%s', ip_address='%s')>" % (
self.readable_name, self.id, self.docker_id, self.cluster_id, self.ip_address)
def extract(self):
"""
Generates a normal object not attached to SQLAlchemy
:rtype : Container
"""
return Container(self)
class Container:
def __init__(self, container: ContainerState):
self.id = container.id
self.docker_id = container.docker_id
self.cluster_id = container.cluster_id
self.ip_address = container.ip_address
self.readable_name = container.readable_name
self.proxies = []
for p in container.proxies:
self.proxies.append(p.extract())
......@@ -6,7 +6,7 @@ from sqlalchemy.orm import relationship
from common.state import Base
class Execution(Base):
class ExecutionState(Base):
__tablename__ = 'executions'
id = Column(Integer, primary_key=True)
......@@ -19,7 +19,7 @@ class Execution(Base):
status = Column(String(32))
termination_notice = Column(Boolean, default=False)
cluster = relationship("Cluster", uselist=False, backref="execution")
cluster = relationship("ClusterState", uselist=False, backref="execution")
type = Column(String(32)) # Needed by sqlalchemy to manage class inheritance
......@@ -49,28 +49,11 @@ class Execution(Base):
if c.readable_name == name:
return c
def __repr__(self):
return "<Execution(name='%s', id='%s', assigned_resourced='%s', application_id='%s', )>" % (
self.name, self.id, self.assigned_resources, self.application_id)
def extract(self):
ret = PlainExecution()
ret.id = self.id
ret.name = self.name
ret.assigned_resources = self.assigned_resources
ret.application_id = self.application_id
ret.time_started = self.time_started
ret.time_scheduled = self.time_scheduled
ret.time_finished = self.time_finished
ret.status = self.status
ret.termination_notice = self.termination_notice
if self.cluster is not None:
ret.cluster_id = self.cluster.id
ret.type = self.type
return ret
class SparkSubmitExecution(Execution):
return Execution(self)
class SparkSubmitExecutionState(ExecutionState):
commandline = Column(String(1024))
spark_opts = Column(String(1024))
......@@ -79,23 +62,32 @@ class SparkSubmitExecution(Execution):
}
def extract(self):
ret = super().extract()
ret.commandline = self.commandline
ret.spark_opts = self.spark_opts
return ret
class PlainExecution:
id = None
name = None
assigned_resources = None
application_id = None
time_started = None
time_scheduled = None
time_finished = None
status = None
termination_notice = None
cluster_id = None
type = None
commandline = None
spark_opts = None
return Execution(self)
class Execution:
def __init__(self, execution: ExecutionState):
self.id = execution.id
self.name = execution.name
self.assigned_resources = execution.assigned_resources
self.application_id = execution.application_id
self.time_started = execution.time_started
self.time_scheduled = execution.time_scheduled
self.time_finished = execution.time_finished
self.status = execution.status
self.termination_notice = execution.termination_notice
if execution.cluster is not None:
self.cluster_id = execution.cluster.id
else:
self.cluster_id = None
self.type = execution.type
if isinstance(execution, SparkSubmitExecutionState):
self.commandline = execution.commandline
self.spark_opts = execution.spark_opts
self.containers = []
if execution.cluster is not None:
for c in execution.cluster.containers:
self.containers.append(c.extract())
......@@ -3,7 +3,7 @@ from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, func
from common.state import Base
class Proxy(Base):
class ProxyState(Base):
__tablename__ = 'proxies'
id = Column(Integer, primary_key=True)
......@@ -13,6 +13,15 @@ class Proxy(Base):
service_name = Column(String(32))
last_access = Column(DateTime, default=func.now())
def __repr__(self):
return "<Proxy(service_name='%s', id='%s', internal_url='%s', cluster_id='%s', container_id='%s', last_access='%s')>" % (
self.service_name, self.id, self.internal_url, self.cluster_id, self.container_id, self.last_access)
def extract(self):
return Proxy(self)
class Proxy:
def __init__(self, proxy: ProxyState):
self.id = proxy.id
self.internal_url = proxy.internal_url
self.cluster_id = proxy.cluster_id
self.container_id = proxy.container_id
self.service_name = proxy.service_name
self.last_access = proxy.last_access
......@@ -4,26 +4,19 @@ from sqlalchemy.orm import relationship
from common.state import Base
class User(Base):
class UserState(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(128))
applications = relationship("Application", order_by="Application.id", backref="user")
def __repr__(self):
return "<User(id='%s', email='%s')>" % (
self.id, self.app_id)
applications = relationship("ApplicationState", order_by="ApplicationState.id", backref="user")
def extract(self):
ret = PlainUser()
ret.id = self.id
ret.email = self.email
return ret
return User(self)
class PlainUser:
id = None
email = None
class User:
def __init__(self, user: UserState):
self.id = user.id
self.email = user.email
import time
from common.state import ApplicationState, ExecutionState, ContainerState, ProxyState
class Stats:
def __init__(self):
self.timestamp = None
class SwarmNodeStats(Stats):
def __init__(self, name):
super().__init__()
self.name = name
self.docker_endpoint = None
self.container_count = 0
self.cores_total = 0
self.cores_reserved = 0
self.memory_total = 0
self.memory_reserved = 0
self.labels = {}
def __str__(self):
s = " -- Node {}\n".format(self.name)
s += " -- Docker endpoint: {}\n".format(self.docker_endpoint)
s += " -- Container count: {}\n".format(self.container_count)
s += " -- Memory total: {}\n".format(self.memory_total)
s += " -- Memory reserved: {}\n".format(self.memory_reserved)
s += " -- Cores total: {}\n".format(self.cores_total)
s += " -- Cores reserved: {}\n".format(self.cores_reserved)
s += " -- Labels: {}\n".format(self.labels)
return s
class SwarmStats(Stats):
def __init__(self):
super().__init__()
self.container_count = 0
self.image_count = 0
self.memory_total = 0
self.cores_total = 0
self.placement_strategy = ''
self.active_filters = []
self.nodes = []
def __str__(self):
s = " - Container count: {}\n".format(self.container_count)
s += " - Image count: {}\n".format(self.image_count)
s += " - Memory total: {}\n".format(self.memory_total)
s += " - Cores total: {}\n".format(self.cores_total)
s += " - Placement strategy: {}\n".format(self.placement_strategy)
s += " - Active filters: {}\n".format(self.active_filters)
for node in self.nodes:
s += str(node)
return s
class SchedulerStats(Stats):
def __init__(self):
super().__init__()
self.count_running = 0
self.count_waiting = 0
def __str__(self):
return " - Apps running: {}\n - Apps waiting: {}\n".format(self.count_running, self.count_waiting)
class PlatformStats(Stats):
def __init__(self):
super().__init__()
self.swarm = SwarmStats()
self.scheduler = SchedulerStats()
def __str__(self):
return "Swarm:\n{}\nScheduler:\n{}\n".format(self.swarm, self.scheduler)
from pprint import pformat
from zoe_scheduler.swarm_status import SwarmStatus
from common.state import Application, Execution, SparkSubmitExecution
class Report:
def __init__(self):
self.report = {}
def __str__(self):
return pformat(self.report)
class PlatformStatusReport(Report):
def include_swarm_status(self, sw_status: SwarmStatus):
self.report["swarm"] = sw_status.to_dict()
class ApplicationStatusReport(Report):
def __init__(self, application: Application):
super().__init__()
self.report["executions"] = []
self._app_to_dict(application)
def _app_to_dict(self, application: Application):
self.report.update(application.to_dict())
def add_execution(self, execution: Execution):
exrep = {
'id': execution.id,
'name': execution.name,
'status': execution.status,
'type': execution.type
}
if execution.time_scheduled is None:
exrep['scheduled_at'] = None
else:
exrep['scheduled_at'] = execution.time_scheduled.timestamp()
if execution.time_started is None:
exrep['started_at'] = None
else:
exrep['started_at'] = execution.time_started.timestamp()
if execution.time_finished is None:
exrep['finished_at'] = None
else:
exrep['finished_at'] = execution.time_finished.timestamp()
if isinstance(execution, SparkSubmitExecution):
exrep["commandline"] = execution.commandline
exrep["spark_opts"] = execution.spark_opts
exrep["cluster"] = []
if execution.cluster is None:
self.report['executions'].append(exrep)
return
for c in execution.cluster.containers:
cd = {
'id': c.id,
'docker_id': c.docker_id,
'ip_address': c.ip_address,
'name': c.readable_name,
'proxies': []
}
for p in c.proxies:
pd = {
'id': p.id,
'internal_url': p.internal_url,
'service_name': p.service_name,
'last_access': p.last_access.timestamp()
}
cd['proxies'].append(pd)
exrep["cluster"].append(cd)
self.report['executions'].append(exrep)
from common.state.execution import Execution
from common.state import Proxy, AlchemySession, Application
from common.state.execution import ExecutionState
from common.state import ProxyState, AlchemySession, ApplicationState
from common.configuration import zoeconf
def generate_log_history_url(execution: Execution) -> str:
def generate_log_history_url(execution: ExecutionState) -> str:
zoe_web_log_history_path = '/api/history/logs/'
return 'http://' + zoeconf.web_server_name + zoe_web_log_history_path + str(execution.id)
def generate_notebook_url(execution: Execution) -> str:
def generate_notebook_url(execution: ExecutionState) -> str:
state = AlchemySession()
c = execution.find_container("spark-notebook")
pr = state.query(Proxy).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
pr = state.query(ProxyState).filter_by(container_id=c.id, service_name="Spark Notebook interface").one()
return 'http://' + zoeconf.web_server_name + zoeconf.proxy_path_url_prefix + '/{}'.format(pr.id)
def generate_application_binary_url(application: Application) -> str:
def generate_application_binary_url(application: ApplicationState) -> str:
return 'http://' + zoeconf.web_server_name + '/api/applications/download/{}'.format(application.id)
This diff is collapsed.
......@@ -4,7 +4,7 @@ import logging
from jinja2 import Template
from common.status import SparkSubmitExecution, Execution
from common.state.execution import SparkSubmitExecutionState, ExecutionState
from common.urls import generate_log_history_url, generate_notebook_url
from common.configuration import zoeconf
......@@ -49,7 +49,7 @@ def do_duration(seconds):
return template.format(d=d, h=h, m=m, s=s)
def notify_execution_finished(execution: SparkSubmitExecution):
def notify_execution_finished(execution: SparkSubmitExecutionState):
app = execution.application
email = app.user.email
......@@ -64,7 +64,7 @@ def notify_execution_finished(execution: SparkSubmitExecution):
send_email(email, subject, APP_FINISH_EMAIL_TEMPLATE, template_vars)
def notify_notebook_notice(execution: Execution):
def notify_notebook_notice(execution: ExecutionState):
app = execution.application
email = app.user.email
......@@ -77,7 +77,7 @@ def notify_notebook_notice(execution: Execution):
send_email(email, subject, NOTEBOOK_WARNING_EMAIL_TEMPLATE, template_vars)
def notify_notebook_termination(execution: Execution):
def notify_notebook_termination(execution: ExecutionState):
app = execution.application
email = app.user.email
......
This diff is collapsed.
import logging
log = logging.getLogger(__name__)
from zoe_scheduler.swarm_status import SwarmStatus
from common.stats import PlatformStats
from zoe_scheduler.swarm_client import SwarmClient
from common.status import PlatformStatusReport
log = logging.getLogger(__name__)
class PlatformStatus:
def __init__(self):
self.swarm_status = SwarmStatus()
def __init__(self, scheduler):
self.swarm = SwarmClient()
self.swarm_status = None
self.scheduler = scheduler
self.scheduler_status = None
def update(self):
self.swarm_status = self.swarm.info()
self.scheduler_status = self.scheduler.scheduler_policy.stats()
def generate_report(self) -> PlatformStatusReport:
report = PlatformStatusReport()
report.include_swarm_status(self.swarm_status)
return report
def stats(self):
ret = PlatformStats()
ret.scheduler = self.scheduler_status
ret.swarm = self.swarm_status
return ret
......@@ -7,7 +7,7 @@ import logging
from jinja2 import Template
from common.configuration import zoeconf
from common.state import