Commit 913b0099 authored by Daniele Venzano's avatar Daniele Venzano
Browse files

When deleting an application remove all data associated with it

Rewrite periodic tasks
Update proxy config file at the right times
Implement periodic task for checking dead containers and expired notebooks
parent 0feab244
...@@ -8,5 +8,7 @@ conf = { ...@@ -8,5 +8,7 @@ conf = {
'redis_db': 0, 'redis_db': 0,
'apache-proxy-config-file': '/tmp/zoe-proxy.conf', 'apache-proxy-config-file': '/tmp/zoe-proxy.conf',
'apache-log-file': '/var/log/apache2/access.log', 'apache-log-file': '/var/log/apache2/access.log',
'proxy_update_accesses': 300 'proxy_update_accesses': 300,
'check_health': 30,
'notebook_max_age_no_activity': 24
} }
...@@ -23,6 +23,12 @@ def application_data_download(application: Application) -> bytes: ...@@ -23,6 +23,12 @@ def application_data_download(application: Application) -> bytes:
return r.get(key) return r.get(key)
def application_data_delete(application: Application):
r = _connect()
key = "app-{}".format(application.id)
r.delete(key)
def logs_archive_upload(execution: Execution, data: bytes) -> bool: def logs_archive_upload(execution: Execution, data: bytes) -> bool:
r = _connect() r = _connect()
key = "log-{}".format(execution.id) key = "log-{}".format(execution.id)
...@@ -33,3 +39,9 @@ def logs_archive_download(execution: Execution) -> bytes: ...@@ -33,3 +39,9 @@ def logs_archive_download(execution: Execution) -> bytes:
r = _connect() r = _connect()
key = "log-{}".format(execution.id) key = "log-{}".format(execution.id)
return r.get(key) return r.get(key)
def logs_archive_delete(execution: Execution):
r = _connect()
key = "log-{}".format(execution.id)
r.delete(key)
...@@ -34,6 +34,13 @@ class Application(Base): ...@@ -34,6 +34,13 @@ class Application(Base):
return "<Application(name='%s', id='%s', required_resourced='%s')>" % ( return "<Application(name='%s', id='%s', required_resourced='%s')>" % (
self.name, self.id, self.required_resources) self.name, self.id, self.required_resources)
def executions_running(self):
ret = []
for e in self.executions:
if e.status == "running":
ret.append(e)
return ret
class SparkApplication(Application): class SparkApplication(Application):
master_image = Column(String(256)) master_image = Column(String(256))
......
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, Integer, String, PickleType, DateTime, ForeignKey from sqlalchemy import Column, Integer, String, PickleType, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from common.state import Base from common.state import Base
...@@ -17,6 +17,7 @@ class Execution(Base): ...@@ -17,6 +17,7 @@ class Execution(Base):
time_started = Column(DateTime) time_started = Column(DateTime)
time_finished = Column(DateTime) time_finished = Column(DateTime)
status = Column(String(32)) status = Column(String(32))
termination_notice = Column(Boolean, default=False)
cluster = relationship("Cluster", uselist=False, backref="execution") cluster = relationship("Cluster", uselist=False, backref="execution")
...@@ -43,6 +44,11 @@ class Execution(Base): ...@@ -43,6 +44,11 @@ class Execution(Base):
self.status = "terminated" self.status = "terminated"
self.time_finished = datetime.now() self.time_finished = datetime.now()
def find_container(self, name):
for c in self.cluster.containers:
if c.readable_name == name:
return c
def __repr__(self): def __repr__(self):
return "<Execution(name='%s', id='%s', assigned_resourced='%s', application_id='%s', )>" % ( return "<Execution(name='%s', id='%s', assigned_resourced='%s', application_id='%s', )>" % (
self.name, self.id, self.assigned_resources, self.application_id) self.name, self.id, self.assigned_resources, self.application_id)
......
...@@ -15,6 +15,7 @@ if __name__ == "__main__": ...@@ -15,6 +15,7 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.INFO) logging.getLogger('asyncio').setLevel(logging.INFO)
logging.getLogger('rpyc').setLevel(logging.WARNING)
log = logging.getLogger('zoe') log = logging.getLogger('zoe')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
......
...@@ -126,6 +126,11 @@ class ZoeClient: ...@@ -126,6 +126,11 @@ class ZoeClient:
running = self.state.query(Execution).filter_by(application_id=application.id, time_finished=None).count() running = self.state.query(Execution).filter_by(application_id=application.id, time_finished=None).count()
if running > 0: if running > 0:
raise ApplicationStillRunning(application) raise ApplicationStillRunning(application)
storage.application_data_delete(application)
for e in application.executions:
self.execution_delete(e)
self.state.delete(application) self.state.delete(application)
self.state.commit() self.state.commit()
...@@ -146,5 +151,11 @@ class ZoeClient: ...@@ -146,5 +151,11 @@ class ZoeClient:
def execution_terminate(self, execution: Execution): def execution_terminate(self, execution: Execution):
self.server.terminate_execution(execution.id) self.server.terminate_execution(execution.id)
def execution_delete(self, execution: Execution):
if execution.status == "running":
raise ApplicationStillRunning(execution.application)
storage.logs_archive_delete(execution)
self.state.delete(execution)
def log_get(self, container_id: int) -> str: def log_get(self, container_id: int) -> str:
return self.server.log_get(container_id) return self.server.log_get(container_id)
import asyncio import asyncio
import logging
log = logging.getLogger(__name__)
@asyncio.coroutine class PeriodicTask(object):
def periodic_routine(func, interval): def __init__(self, func, interval):
while True: self.func = func
yield from asyncio.sleep(interval) self.interval = interval
func() self._loop = asyncio.get_event_loop()
self._set()
def _set(self):
self._handler = self._loop.call_later(self.interval, self._run)
def periodic_task(func, interval) -> asyncio.Task: def _run(self):
return asyncio.Task(periodic_routine(func, interval)) try:
self.func()
except:
log.exception("Exception in periodic task")
self._set()
from sqlalchemy.orm import object_session from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
from io import BytesIO
import zipfile
from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions from zoe_scheduler.swarm_client import SwarmClient, ContainerOptions
from zoe_scheduler.proxy_manager import pm
from common.state import AlchemySession, Cluster, Container, SparkApplication, Proxy, Execution, SparkNotebookApplication, SparkSubmitApplication, SparkSubmitExecution from common.state import AlchemySession, Application, Cluster, Container, SparkApplication, Proxy, Execution, SparkNotebookApplication, SparkSubmitApplication, SparkSubmitExecution
from common.application_resources import ApplicationResources from common.application_resources import ApplicationResources
from common.exceptions import CannotCreateCluster from common.exceptions import CannotCreateCluster
from common.configuration import conf from common.configuration import conf
from common.object_storage import logs_archive_upload
class PlatformManager: class PlatformManager:
...@@ -19,6 +25,7 @@ class PlatformManager: ...@@ -19,6 +25,7 @@ class PlatformManager:
self._application_to_containers(state, execution) self._application_to_containers(state, execution)
execution.set_started() execution.set_started()
state.commit() state.commit()
pm.update_proxy()
return True return True
def _application_to_containers(self, state: AlchemySession, execution: Execution): def _application_to_containers(self, state: AlchemySession, execution: Execution):
...@@ -150,15 +157,57 @@ class PlatformManager: ...@@ -150,15 +157,57 @@ class PlatformManager:
def terminate_execution(self, state: AlchemySession, execution: Execution): def terminate_execution(self, state: AlchemySession, execution: Execution):
cluster = execution.cluster cluster = execution.cluster
logs = []
if cluster is not None: if cluster is not None:
containers = cluster.containers containers = cluster.containers
for c in containers: for c in containers:
logs.append((c.readable_name, c.ip_address, self.log_get(c)))
self.swarm.terminate_container(c.docker_id) self.swarm.terminate_container(c.docker_id)
state.delete(c) state.delete(c)
for p in cluster.proxies: for p in cluster.proxies:
state.delete(p) state.delete(p)
state.delete(cluster) state.delete(cluster)
execution.set_terminated() execution.set_terminated()
self._archive_execution_logs(execution, logs)
pm.update_proxy()
def log_get(self, container: Container) -> str: def log_get(self, container: Container) -> str:
return self.swarm.log_get(container.docker_id) return self.swarm.log_get(container.docker_id)
def _archive_execution_logs(self, execution: Execution, logs: list):
zipdata = BytesIO()
with zipfile.ZipFile(zipdata, "w", compression=zipfile.ZIP_DEFLATED) as logzip:
for c in logs:
fname = c[0] + "-" + c[1] + ".txt"
logzip.writestr(fname, c[2])
logs_archive_upload(execution, zipdata.getvalue())
def is_container_alive(self, container: Container) -> bool:
ret = self.swarm.inspect_container(container.docker_id)
return ret["running"]
def check_executions_health(self):
log.debug("Running check health task")
state = AlchemySession()
all_containers = state.query(Container).all()
for c in all_containers:
if not self.is_container_alive(c):
self._container_died(state, c)
notebooks = state.query(SparkNotebookApplication).all()
for nb in notebooks:
execs = nb.executions_running()
for e in execs:
c = e.find_container("spark-notebook")
if c is not None:
pr = state.query(Proxy).filter_by(container_id=c.id, service_name="Spark Notebook interface")
if datetime.now() - pr.last_access > timedelta(hours=conf["notebook_max_age_no_activity"]):
self.terminate_execution(state, e)
state.commit()
def _container_died(self, state: AlchemySession, container: Container):
if container.readable_name == "spark-submit" or container.readable_name == "spark-master":
self.terminate_execution(state, container.cluster.execution)
else:
log.warning("Container {} (ID: {}) died unexpectedly")
import time import logging
log = logging.getLogger(__name__)
from zoe_scheduler.swarm_status import SwarmStatus from zoe_scheduler.swarm_status import SwarmStatus
from zoe_scheduler.periodic_tasks import periodic_task
from zoe_scheduler.swarm_client import SwarmClient from zoe_scheduler.swarm_client import SwarmClient
from common.status import PlatformStatusReport from common.status import PlatformStatusReport
...@@ -13,12 +13,9 @@ class PlatformStatus: ...@@ -13,12 +13,9 @@ class PlatformStatus:
self.swarm = SwarmClient() self.swarm = SwarmClient()
def update(self): def update(self):
log.debug("Running platform status update task")
self.swarm_status = self.swarm.info() self.swarm_status = self.swarm.info()
def update_task(self, interval):
self.update()
periodic_task(self.update, interval)
def generate_report(self) -> PlatformStatusReport: def generate_report(self) -> PlatformStatusReport:
report = PlatformStatusReport() report = PlatformStatusReport()
report.include_swarm_status(self.swarm_status) report.include_swarm_status(self.swarm_status)
......
...@@ -51,14 +51,14 @@ class ProxyManager: ...@@ -51,14 +51,14 @@ class ProxyManager:
jinja_template = Template(ENTRY_TEMPLATE) jinja_template = Template(ENTRY_TEMPLATE)
node_list = [] node_list = []
for p in proxy_entries: for p in proxy_entries:
netloc = urlparse(p["internal_url"])[1] netloc = urlparse(p.internal_url)[1]
node_list.append((netloc, p["id"])) node_list.append((netloc, p.id))
for p in proxy_entries: for p in proxy_entries:
netloc = urlparse(p["internal_url"])[1] netloc = urlparse(p.internal_url)[1]
jinja_dict = { jinja_dict = {
"proxy_id": p["id"], "proxy_id": p.id,
"proxy_url": p["internal_url"], "proxy_url": p.internal_url,
"service_name": p["service_name"], "service_name": p.service_name,
"netloc": netloc, "netloc": netloc,
"nodes": node_list "nodes": node_list
} }
...@@ -67,7 +67,7 @@ class ProxyManager: ...@@ -67,7 +67,7 @@ class ProxyManager:
return output return output
def _commit_and_reload(self, generated_file): def _commit_and_reload(self, generated_file):
open(self.filepath, "w").write(generated_file) open(self.apache_conf_filepath, "w").write(generated_file)
system("sudo service apache2 reload") system("sudo service apache2 reload")
log.info("Apache reloaded") log.info("Apache reloaded")
...@@ -77,6 +77,7 @@ class ProxyManager: ...@@ -77,6 +77,7 @@ class ProxyManager:
self._commit_and_reload(output) self._commit_and_reload(output)
def update_proxy_access_timestamps(self): def update_proxy_access_timestamps(self):
log.debug("Running update proxy accesses task")
regex = re.compile('[0-9.]+ - - \[(.*)\] "GET /proxy/([0-9a-z\-]+)/') regex = re.compile('[0-9.]+ - - \[(.*)\] "GET /proxy/([0-9a-z\-]+)/')
logf = open(self.apache_access_log, 'r') logf = open(self.apache_access_log, 'r')
last_accesses = {} last_accesses = {}
...@@ -88,11 +89,12 @@ class ProxyManager: ...@@ -88,11 +89,12 @@ class ProxyManager:
last_accesses[proxy_id] = timestamp last_accesses[proxy_id] = timestamp
state = AlchemySession() state = AlchemySession()
for proxy in state.get_proxies(): for proxy in state.query(Proxy).all():
proxy_id = proxy['id'] proxy_id = proxy['id']
if proxy_id in last_accesses: if proxy_id in last_accesses:
proxy = state.query(Proxy).filter_by(id=proxy_id).one() proxy = state.query(Proxy).filter_by(id=proxy_id).one()
proxy.last_access = last_accesses[proxy_id] proxy.last_access = last_accesses[proxy_id]
proxy.container.cluster.execution.termination_notice = False
state.commit() state.commit()
pm = ProxyManager() pm = ProxyManager()
import asyncio import asyncio
import logging import logging
log = logging.getLogger("rpyc")
from rpyc.utils.server import UDPRegistryClient, AuthenticationError, Connection, Channel, SocketStream from rpyc.utils.server import UDPRegistryClient, AuthenticationError, Connection, Channel, SocketStream
from zoe_scheduler.periodic_tasks import periodic_task from zoe_scheduler.periodic_tasks import PeriodicTask
class RPyCAsyncIOServer: class RPyCAsyncIOServer:
...@@ -24,15 +25,13 @@ class RPyCAsyncIOServer: ...@@ -24,15 +25,13 @@ class RPyCAsyncIOServer:
server will attempt to register only if a registrar was explicitly given. server will attempt to register only if a registrar was explicitly given.
:param protocol_config: the :data:`configuration dictionary <rpyc.core.protocol.DEFAULT_CONFIG>` :param protocol_config: the :data:`configuration dictionary <rpyc.core.protocol.DEFAULT_CONFIG>`
that is passed to the RPyC connection that is passed to the RPyC connection
:param logger: the ``logger`` to use (of the built-in ``logging`` module). If ``None``, a
default logger will be created.
:param listener_timeout: the timeout of the listener socket; set to ``None`` to disable (e.g. :param listener_timeout: the timeout of the listener socket; set to ``None`` to disable (e.g.
on embedded platforms with limited battery) on embedded platforms with limited battery)
""" """
def __init__(self, service, hostname="", ipv6=False, port=0, def __init__(self, service, hostname="", ipv6=False, port=0,
backlog=10, reuse_addr=True, authenticator=None, registrar=None, backlog=10, reuse_addr=True, authenticator=None, registrar=None,
auto_register=None, protocol_config=None, logger=None, listener_timeout=0.5): auto_register=None, protocol_config=None, listener_timeout=0.5):
if not protocol_config: if not protocol_config:
protocol_config = {} protocol_config = {}
...@@ -49,11 +48,7 @@ class RPyCAsyncIOServer: ...@@ -49,11 +48,7 @@ class RPyCAsyncIOServer:
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
if logger is None: self.logger = log
logger = logging.getLogger("%s/%d" % (self.service.get_service_name(), self.port))
self.logger = logger
if "logger" not in self.protocol_config:
self.protocol_config["logger"] = self.logger
if registrar is None: if registrar is None:
registrar = UDPRegistryClient(logger = self.logger) registrar = UDPRegistryClient(logger = self.logger)
self.registrar = registrar self.registrar = registrar
...@@ -145,4 +140,4 @@ class RPyCAsyncIOServer: ...@@ -145,4 +140,4 @@ class RPyCAsyncIOServer:
self.logger.info("server started on [%s]:%s", self.hostname, self.port) self.logger.info("server started on [%s]:%s", self.hostname, self.port)
if self.auto_register: if self.auto_register:
self._bg_register() self._bg_register()
periodic_task(self._bg_register, self.registrar.REREGISTER_INTERVAL) PeriodicTask(self._bg_register, self.registrar.REREGISTER_INTERVAL)
import asyncio import asyncio
import logging
log = logging.getLogger(__name__)
from zoe_scheduler.platform import PlatformManager from zoe_scheduler.platform import PlatformManager
from zoe_scheduler.platform_status import PlatformStatus from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.periodic_tasks import periodic_task from zoe_scheduler.periodic_tasks import PeriodicTask
from zoe_scheduler.proxy_manager import pm from zoe_scheduler.proxy_manager import pm
from common.configuration import conf from common.configuration import conf
...@@ -64,9 +66,11 @@ class ZoeScheduler: ...@@ -64,9 +66,11 @@ class ZoeScheduler:
self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status) self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status)
def init_tasks(self): def init_tasks(self):
self.platform_status.update_task(conf["status_refresh_interval"]) self.platform_status.update()
periodic_task(self.schedule, conf['scheduler_task_interval']) PeriodicTask(self.platform_status.update, conf["status_refresh_interval"])
periodic_task(pm.update_proxy_access_timestamps, conf['proxy_update_accesses']) PeriodicTask(self.schedule, conf['scheduler_task_interval'])
PeriodicTask(pm.update_proxy_access_timestamps, conf['proxy_update_accesses'])
PeriodicTask(self.platform.check_executions_health, conf["check_health"])
def incoming(self, execution: Execution) -> bool: def incoming(self, execution: Execution) -> bool:
if not self.scheduler_policy.admission_control(execution.application.required_resources): if not self.scheduler_policy.admission_control(execution.application.required_resources):
...@@ -84,6 +88,7 @@ class ZoeScheduler: ...@@ -84,6 +88,7 @@ class ZoeScheduler:
self.scheduler_policy.started(execution_id, resources) self.scheduler_policy.started(execution_id, resources)
def schedule(self): def schedule(self):
log.debug("Running schedule task")
self._check_runnable() self._check_runnable()
def terminate_execution(self, state, execution: Execution): def terminate_execution(self, state, execution: Execution):
......
...@@ -64,6 +64,24 @@ class SwarmClient: ...@@ -64,6 +64,24 @@ class SwarmClient:
"ip_address": docker_info["NetworkSettings"]["IPAddress"], "ip_address": docker_info["NetworkSettings"]["IPAddress"],
"docker_id": docker_id "docker_id": docker_id
} }
if docker_info["State"]["Running"]:
info["state"] = "running"
info["running"] = True
elif docker_info["State"]["Paused"]:
info["state"] = "paused"
info["running"] = True
elif docker_info["State"]["Restarting"]:
info["state"] = "restarting"
info["running"] = True
elif docker_info["State"]["OOMKilled"]:
info["state"] = "killed"
info["running"] = False
elif docker_info["State"]["Dead"]:
info["state"] = "killed"
info["running"] = False
else:
info["state"] = "unknown"
info["running"] = False
return info return info
def terminate_container(self, docker_id): def terminate_container(self, docker_id):
......
...@@ -69,6 +69,14 @@ def app_rm_cmd(args): ...@@ -69,6 +69,14 @@ def app_rm_cmd(args):
if application is None: if application is None:
print("Error: application {} does not exist".format(args.id)) print("Error: application {} does not exist".format(args.id))
return return
if args.force:
a = client.application_status(application)
for eid in a["executions"]:
e = client.execution_get(eid)
if e.status == "running":
print("Terminating execution {}".format(e.name))
client.execution_terminate(e)
client.application_remove(application) client.application_remove(application)
...@@ -154,7 +162,7 @@ def process_arguments() -> Namespace: ...@@ -154,7 +162,7 @@ def process_arguments() -> Namespace:
argparser_app_rm = subparser.add_parser('app-rm', help="Delete an application") argparser_app_rm = subparser.add_parser('app-rm', help="Delete an application")
argparser_app_rm.add_argument('id', type=int, help="Application id") argparser_app_rm.add_argument('id', type=int, help="Application id")
# argparser_app_rm.add_argument('-f', '--force', action="store_true", help="Kill also all active executions, if any") TODO argparser_app_rm.add_argument('-f', '--force', action="store_true", help="Kill also all active executions, if any")
argparser_app_rm.set_defaults(func=app_rm_cmd) argparser_app_rm.set_defaults(func=app_rm_cmd)
argparser_app_inspect = subparser.add_parser('app-inspect', help="Gather details about an application and its active executions") argparser_app_inspect = subparser.add_parser('app-inspect', help="Gather details about an application and its active executions")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment