Commit fcaed6f3 authored by Daniele Venzano's avatar Daniele Venzano

Use threads instead of asyncio

parent 94e2b695
#!/usr/bin/env python3
import argparse
import asyncio
import logging
log = logging.getLogger('zoe')
import signal
from rpyc.utils.server import ThreadedServer
from zoe_scheduler.rpyc_service import ZoeSchedulerRPCService
from zoe_scheduler.rpyc_server import RPyCAsyncIOServer
from zoe_scheduler.scheduler import zoe_sched
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
log = logging.getLogger('zoe')
loop = None
rpyc_server = None
......@@ -38,24 +38,26 @@ def main():
args = process_arguments()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('asyncio').setLevel(logging.INFO)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('rpyc').setLevel(logging.WARNING)
rpyc_logger = logging.getLogger('rpyc')
rpyc_logger.setLevel(logging.WARNING)
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, sigint_handler)
rpyc_server = RPyCAsyncIOServer(ZoeSchedulerRPCService, '0.0.0.0', port=4000, auto_register=True)
rpyc_server.start()
tm = PeriodicTaskManager()
# rpyc_server = RPyCAsyncIOServer(ZoeSchedulerRPCService, '0.0.0.0', port=4000, auto_register=True)
rpyc_server = ThreadedServer(ZoeSchedulerRPCService, '0.0.0.0', port=4000,
auto_register=not args.rpyc_no_auto_register,
protocol_config={"allow_public_attrs": True},
logger=rpyc_logger)
zoe_sched.init_tasks()
zoe_sched.init_tasks(tm)
loop.run_forever()
rpyc_server.start()
loop.close()
tm.stop_all()
if __name__ == "__main__":
main()
import asyncio
from threading import Thread, Event
import logging
log = logging.getLogger(__name__)
class PeriodicTask:
def __init__(self, func, interval):
self.func = func
self.interval = interval
self._loop = asyncio.get_event_loop()
self._set()
class PeriodicTaskManager:
"""
This is for internal tasks of the Zoe scheduler component, do not confuse this for the application scheduler.
"""
def __init__(self):
self.tasks = []
self.terminate = Event()
self.terminate.clear()
def _set(self):
self._handler = self._loop.call_later(self.interval, self._run)
def add_task(self, name, func, delay):
th = Thread(name=name, target=self._generic_task, args=(name, delay, func))
th.daemon = True
th.start()
def _run(self):
try:
self.func()
except:
log.exception("Exception in periodic task")
self._set()
def _generic_task(self, name, delay, func):
log.info("Task {} started".format(name))
while True:
func()
stop = self.terminate.wait(delay)
if stop:
break
log.info("Task {} ended".format(name))
def stop(self):
self._handler.cancel()
def stop_all(self):
self.terminate.set()
for t in self.tasks:
t.join()
log.info("All tasks terminated")
import asyncio
import logging
log = logging.getLogger("rpyc")
from rpyc.utils.server import UDPRegistryClient, AuthenticationError, Connection, Channel, SocketStream
from zoe_scheduler.periodic_tasks import PeriodicTask
class RPyCAsyncIOServer:
"""AsyncIO RpyC server implementation
:param service: the :class:`service <service.Service>` to expose
:param hostname: the host to bind to. Default is IPADDR_ANY, but you may
want to restrict it only to ``localhost`` in some setups
:param ipv6: whether to create an IPv6 or IPv4 socket. The default is IPv4
:param port: the TCP port to bind to
:param backlog: the socket's backlog (passed to ``listen()``)
:param reuse_addr: whether or not to create the socket with the ``SO_REUSEADDR`` option set.
:param authenticator: the :ref:`api-authenticators` to use. If ``None``, no authentication
is performed.
:param registrar: the :class:`registrar <rpyc.utils.registry.RegistryClient>` to use.
If ``None``, a default :class:`rpyc.utils.registry.UDPRegistryClient`
will be used
:param auto_register: whether or not to register using the *registrar*. By default, the
server will attempt to register only if a registrar was explicitly given.
:param protocol_config: the :data:`configuration dictionary <rpyc.core.protocol.DEFAULT_CONFIG>`
that is passed to the RPyC connection
:param listener_timeout: the timeout of the listener socket; set to ``None`` to disable (e.g.
on embedded platforms with limited battery)
"""
def __init__(self, service, hostname="", ipv6=False, port=0,
backlog=10, reuse_addr=True, authenticator=None, registrar=None,
auto_register=None, protocol_config=None, listener_timeout=0.5):
if not protocol_config:
protocol_config = {}
self.service = service
self.authenticator = authenticator
self.backlog = backlog
if auto_register is None:
self.auto_register = bool(registrar)
else:
self.auto_register = auto_register
self.protocol_config = {"allow_public_attrs": True}
if protocol_config is not None:
self.protocol_config.update(protocol_config)
self.hostname = hostname
self.port = port
self.logger = log
if registrar is None:
registrar = UDPRegistryClient(logger = self.logger)
self.registrar = registrar
self.register_task = None
# The asyncio Server object
self.server = None
# Unused parameters
self.ipv6 = ipv6
self.reuse_addr = reuse_addr
self.listener_timeout = listener_timeout
def close(self):
"""Closes (terminates) the server and all of its clients. If applicable,
also unregisters from the registry server"""
if self.auto_register:
try:
self.registrar.unregister(self.port)
except Exception:
self.logger.exception("error unregistering services")
def fileno(self):
"""returns the listener socket's file descriptor"""
return self.server.sockets[0]
@asyncio.coroutine
def _accept_method(self, reader, writer):
self._authenticate_and_serve_client(reader, writer)
def _authenticate_and_serve_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
if self.authenticator:
addrinfo = writer.transport.get_extra_info("peername")
h = addrinfo[0]
p = addrinfo[1]
try:
credentials = self.authenticator(reader, writer)
except AuthenticationError:
self.logger.info("[%s]:%s failed to authenticate, rejecting connection", h, p)
return
else:
self.logger.info("[%s]:%s authenticated successfully", h, p)
else:
credentials = None
try:
self._serve_client(reader, writer, credentials)
except Exception:
self.logger.exception("client connection terminated abruptly")
raise
writer.close()
def _serve_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, credentials):
addrinfo = writer.transport.get_extra_info("peername")
h = addrinfo[0]
p = addrinfo[1]
sockname = writer.transport.get_extra_info("sockname")
sock = writer.transport.get_extra_info("socket")
if credentials:
self.logger.info("welcome [%s]:%s (%r)", h, p, credentials)
else:
self.logger.info("welcome [%s]:%s", h, p)
try:
config = dict(self.protocol_config,
credentials=credentials,
endpoints=(sockname, addrinfo),
logger=self.logger)
conn = Connection(self.service,
Channel(SocketStream(sock)),
config=config,
_lazy=True)
conn._init_service()
conn.serve_all()
finally:
self.logger.info("goodbye [%s]:%s", h, p)
def _bg_register(self):
aliases = self.service.get_service_aliases()
try:
self.registrar.register(aliases, self.port, interface=self.hostname)
except:
self.logger.exception("error registering services")
def start(self):
"""Starts the server. Use :meth:`close` to stop"""
loop = asyncio.get_event_loop()
coro = asyncio.start_server(self._accept_method, self.hostname, self.port, loop=loop, backlog=self.backlog)
self.server = loop.run_until_complete(coro)
self.logger.info("server started on [%s]:%s", self.hostname, self.port)
if self.auto_register:
self._bg_register()
self.register_task = PeriodicTask(self._bg_register, self.registrar.REREGISTER_INTERVAL)
def stop(self):
self.register_task.stop()
self.server.close()
import asyncio
import logging
log = logging.getLogger(__name__)
from zoe_scheduler.platform import PlatformManager
from zoe_scheduler.platform_status import PlatformStatus
from zoe_scheduler.periodic_tasks import PeriodicTask
from zoe_scheduler.periodic_tasks import PeriodicTaskManager
from zoe_scheduler.proxy_manager import pm
from common.configuration import conf
from common.state import Execution
from common.application_resources import ApplicationResources
log = logging.getLogger(__name__)
class SimpleSchedulerPolicy:
def __init__(self, platform_status: PlatformStatus):
......@@ -64,28 +64,18 @@ class ZoeScheduler:
self.platform = PlatformManager()
self.platform_status = PlatformStatus()
self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status)
self.tasks = []
def init_tasks(self):
self.platform_status.update()
tsk = PeriodicTask(self.platform_status.update, conf["status_refresh_interval"])
self.tasks.append(tsk)
tsk = PeriodicTask(self.schedule, conf['scheduler_task_interval'])
self.tasks.append(tsk)
tsk = PeriodicTask(pm.update_proxy_access_timestamps, conf['proxy_update_accesses'])
self.tasks.append(tsk)
tsk = PeriodicTask(self.platform.check_executions_health, conf["check_health"])
self.tasks.append(tsk)
def stop_tasks(self):
for tsk in self.tasks:
tsk.stop()
def init_tasks(self, tm: PeriodicTaskManager):
tm.add_task("platform status updater", self.platform_status.update, conf["status_refresh_interval"])
tm.add_task("scheduler", self.schedule, conf['scheduler_task_interval'])
tm.add_task("proxy access timestamp updater", pm.update_proxy_access_timestamps, conf['proxy_update_accesses'])
tm.add_task("execution health checker", self.platform.check_executions_health, conf["check_health"])
def incoming(self, execution: Execution) -> bool:
if not self.scheduler_policy.admission_control(execution.application.required_resources):
return False
self.scheduler_policy.insert(execution.id, execution.application.required_resources)
asyncio.get_event_loop().call_soon(self._check_runnable)
self._check_runnable()
return True
def _check_runnable(self): # called periodically, does not use state to keep database load low
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment