Commit 873146f5 authored by Daniele Venzano's avatar Daniele Venzano

Split the things in a client and a scheduler, the client will be available as...

Split the things in a client and a scheduler, the client will be available as a cli client or a web application.
parent e128a1c5
......@@ -51,4 +51,11 @@
<primary-key name="PRIMARY" columns="id"/>
</table>
</data-source>
<data-source name="MySQL - @m1 devel" uuid="33b1ec79-4374-4ff8-a8f8-26c89b418a79">
<database-info product="MySQL" version="5.5.44-MariaDB-1ubuntu0.14.04.1" jdbc-version="4.0" driver-name="MySQL Connector Java" driver-version="mysql-connector-java-5.1.35 ( Revision: 5fb9c5849535c13917c2cf9baaece6ef9693ef27 )">
<extra-name-characters>#@</extra-name-characters>
<identifier-quote-string>`</identifier-quote-string>
</database-info>
<case-sensitivity plain-identifiers="exact" quoted-identifiers="exact"/>
</data-source>
</component>
\ No newline at end of file
......@@ -7,5 +7,11 @@
<schema-pattern>caaas.*</schema-pattern>
<default-schemas>caaas.*</default-schemas>
</data-source>
<data-source name="MySQL - @m1 devel" uuid="33b1ec79-4374-4ff8-a8f8-26c89b418a79">
<secret-storage>master_key</secret-storage>
<user-name>caaas_devel</user-name>
<schema-pattern>caaas.*</schema-pattern>
<default-schemas>caaas.*</default-schemas>
</data-source>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" hash="3694309854">
<component name="DataSourceManagerImpl" format="xml" hash="270082280">
<data-source source="LOCAL" name="MySQL - @m1" uuid="a32fd6de-3ffa-40c0-9ec8-8953a89c53e0">
<driver-ref>mysql</driver-ref>
<synchronize>true</synchronize>
......@@ -15,5 +15,19 @@
</driver-properties>
<libraries />
</data-source>
<data-source source="LOCAL" name="MySQL - @m1 devel" uuid="33b1ec79-4374-4ff8-a8f8-26c89b418a79">
<driver-ref>mysql</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>com.mysql.jdbc.Driver</jdbc-driver>
<jdbc-url>jdbc:mysql://localhost:3306</jdbc-url>
<driver-properties>
<property name="zeroDateTimeBehavior" value="convertToNull" />
<property name="tinyInt1isBit" value="false" />
<property name="characterEncoding" value="utf8" />
<property name="characterSetResults" value="utf8" />
<property name="yearIsDateType" value="false" />
</driver-properties>
<libraries />
</data-source>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/caaas/sql.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/images/notebook/files/start-notebook.sh" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/caaas_web/sql.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/scripts/images/notebook/files/start-notebook.sh" charset="UTF-8" />
<file url="PROJECT" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
......@@ -5,7 +5,7 @@
<entry url="file://$PROJECT_DIR$">
<entryData>
<resourceRoots>
<path value="file://$PROJECT_DIR$/caaas" />
<path value="file://$PROJECT_DIR$/caaas_web" />
</resourceRoots>
</entryData>
</entry>
......
This diff is collapsed.
......@@ -7,8 +7,8 @@ from datetime import datetime
from jinja2 import Template
from caaas.sql import CAaaState
from caaas.config_parser import config
from caaas_web.sql import CAaaState
from caaas_web.config_parser import config
LOOP_INTERVAL = 1 # seconds
ACCESS_TIME_REFRESH_INTERVAL = 60 # seconds
......
import asyncio
import logging
import signal
from caaas_scheduler.rpyc_service import CAaaSSchedulerRPCService
from caaas_scheduler.rpyc_server import RPyCAsyncIOServer
from caaas_scheduler.scheduler import CAaaSSCheduler
def sigint_handler():
log.warning('CTRL-C detected, terminating event loop...')
loop.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.INFO)
log = logging.getLogger('caaas')
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, sigint_handler)
rpyc_server = RPyCAsyncIOServer(CAaaSSchedulerRPCService, '0.0.0.0', port=4000, auto_register=True)
rpyc_server.start()
CAaaSSCheduler().init_tasks()
try:
loop.run_forever()
except KeyboardInterrupt:
log.warning('CTRL-C detected, terminating event loop...')
loop.run_until_complete(rpyc_server.server.wait_closed())
loop.close()
from caaas_client.client import CAaaSClient
import rpyc
from sqlalchemy.orm.exc import NoResultFound
from common.state import AlchemySession, SparkApplication, User
from common.application_resources import SparkApplicationResources
from common.status import PlatformStatusReport
from common.exceptions import UserIDDoesNotExist
REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
WORKER_IMAGE = REGISTRY + "/venza/spark-worker:1.4.1"
SHELL_IMAGE = REGISTRY + "/venza/spark-shell:1.4.1"
SUBMIT_IMAGE = REGISTRY + "/venza/spark-submit:1.4.1"
NOTEBOOK_IMAGE = REGISTRY + "/venza/spark-notebook:1.4.1"
class CAaaSClient:
def __init__(self):
self.server_connection = rpyc.connect_by_service("CAaaSSchedulerRPC")
self.server = self.server_connection.root
self.state = AlchemySession()
def user_new(self, email) -> int:
user = User(email=email)
self.state.add(user)
self.state.commit()
return user.id
def user_get(self, email) -> int:
user = self.state.query(User).filter_by(email=email).one()
return user.id
def platform_status(self) -> PlatformStatusReport:
return self.server.get_platform_status()
def spark_application_new(self, user_id: int, worker_count: int, executor_memory: str, executor_cores: int) -> SparkApplication:
try:
user = self.state.query(User).filter_by(id=user_id).one()
except NoResultFound:
raise UserIDDoesNotExist(user_id)
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["worker_cores"] = str(executor_cores)
app = SparkApplication(master_image=MASTER_IMAGE, worker_image=WORKER_IMAGE, name='empty-cluster', required_resources=resources)
self.state.add(app)
self.state.commit()
return app
import asyncio
@asyncio.coroutine
def periodic_routine(func, interval):
while True:
yield from asyncio.sleep(interval)
func()
def periodic_task(func, interval) -> asyncio.Task:
return asyncio.Task(periodic_routine(func, interval))
from caaas_scheduler.swarm_client import SwarmClient, ContainerOptions
from common.state import AlchemySession, Cluster, Container, Application, SparkApplication
from common.application_resources import ApplicationResources, SparkApplicationResources
class PlatformManager:
def __init__(self):
self.swarm = SwarmClient()
def start_application(self, application: Application, resources: ApplicationResources):
state = AlchemySession()
self._application_to_containers(state, application, resources)
state.commit()
# noinspection PyTypeChecker
def _application_to_containers(self, state, application: Application, resources: ApplicationResources):
if type(application) is SparkApplication:
self._spark_app_to_containers(state, application, resources)
else:
raise NotImplementedError('%s application are not implemented' % type(application))
def _spark_app_to_containers(self, state: AlchemySession, application: SparkApplication, resources: SparkApplicationResources):
cluster = Cluster(app_id=application.id)
state.add(cluster)
# Master
master_requirements = resources.master_resources
master_opts = ContainerOptions()
if "memory_limit" in master_requirements:
master_opts.set_memory_limit(master_requirements["memory_limit"])
image = application.master_image
master_info = self.swarm.spawn_container(image, master_opts)
container = Container(docker_id=master_info["docker_id"], ip_address=master_info["ip_address"], readable_name="spark-master")
container.cluster = cluster
state.add(container)
# Workers
worker_requirements = resources.worker_resources
worker_opts = ContainerOptions()
worker_opts.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"])
if "memory_limit" in worker_requirements:
worker_opts.add_env_variable("SPARK_WORKER_RAM", worker_requirements["memory_limit"])
worker_opts.set_memory_limit(worker_requirements["memory_limit"])
if "worker_cores" in worker_requirements:
worker_opts.add_env_variable("SPARK_WORKER_CORES", worker_requirements["worker_cores"])
image = application.worker_image
for i in range(resources.worker_count):
worker_info = self.swarm.spawn_container(image, worker_opts)
container = Container(docker_id=worker_info["docker_id"], ip_address=worker_info["ip_address"], readable_name="spark-worker-%d" % i)
container.cluster = cluster
state.add(container)
def terminate_application(self, application: Application):
state = AlchemySession()
cluster = state.query(Cluster).filter_by(app_id=application.id)
containers = cluster.containers
for c in containers:
self.swarm.terminate_container(c.docker_id)
state.delete(c)
state.delete(cluster)
state.commit()
import time
from caaas_scheduler.swarm_status import SwarmStatus
from caaas_scheduler.periodic_tasks import periodic_task
from caaas_scheduler.swarm_client import SwarmClient
from common.status import PlatformStatusReport
class PlatformStatus:
def __init__(self):
self.swarm_status = SwarmStatus()
self.swarm_status_timestamp = time.time()
self.swarm = SwarmClient()
def update(self):
self.swarm_status = self.swarm.info()
self.swarm_status_timestamp = time.time()
def update_task(self, interval):
self.update()
periodic_task(self.update, interval)
def generate_report(self) -> PlatformStatusReport:
report = PlatformStatusReport()
return report
import asyncio
import logging
import time
import threading
from rpyc.utils.server import UDPRegistryClient, AuthenticationError, Connection, Channel, SocketStream
class RPyCAsyncIOServer:
"""AsyncIO RpyC server implementation
:param service: the :class:`service <service.Service>` to expose
:param hostname: the host to bind to. Default is IPADDR_ANY, but you may
want to restrict it only to ``localhost`` in some setups
:param ipv6: whether to create an IPv6 or IPv4 socket. The default is IPv4
:param port: the TCP port to bind to
:param backlog: the socket's backlog (passed to ``listen()``)
:param reuse_addr: whether or not to create the socket with the ``SO_REUSEADDR`` option set.
:param authenticator: the :ref:`api-authenticators` to use. If ``None``, no authentication
is performed.
:param registrar: the :class:`registrar <rpyc.utils.registry.RegistryClient>` to use.
If ``None``, a default :class:`rpyc.utils.registry.UDPRegistryClient`
will be used
:param auto_register: whether or not to register using the *registrar*. By default, the
server will attempt to register only if a registrar was explicitly given.
:param protocol_config: the :data:`configuration dictionary <rpyc.core.protocol.DEFAULT_CONFIG>`
that is passed to the RPyC connection
:param logger: the ``logger`` to use (of the built-in ``logging`` module). If ``None``, a
default logger will be created.
:param listener_timeout: the timeout of the listener socket; set to ``None`` to disable (e.g.
on embedded platforms with limited battery)
"""
def __init__(self, service, hostname="", ipv6=False, port=0,
backlog=10, reuse_addr=True, authenticator=None, registrar=None,
auto_register=None, protocol_config=None, logger=None, listener_timeout=0.5):
if not protocol_config:
protocol_config = {}
self.service = service
self.authenticator = authenticator
self.backlog = backlog
if auto_register is None:
self.auto_register = bool(registrar)
else:
self.auto_register = auto_register
self.protocol_config = protocol_config
self.hostname = hostname
self.port = port
if logger is None:
logger = logging.getLogger("%s/%d" % (self.service.get_service_name(), self.port))
self.logger = logger
if "logger" not in self.protocol_config:
self.protocol_config["logger"] = self.logger
if registrar is None:
registrar = UDPRegistryClient(logger = self.logger)
self.registrar = registrar
# The asyncio Server object
self.server = None
# Unused parameters
self.ipv6 = ipv6
self.reuse_addr = reuse_addr
self.listener_timeout = listener_timeout
def close(self):
"""Closes (terminates) the server and all of its clients. If applicable,
also unregisters from the registry server"""
if self.auto_register:
try:
self.registrar.unregister(self.port)
except Exception:
self.logger.exception("error unregistering services")
def fileno(self):
"""returns the listener socket's file descriptor"""
return self.server.sockets[0]
def _accept_method(self, reader, writer):
self._authenticate_and_serve_client(reader, writer)
def _authenticate_and_serve_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
if self.authenticator:
addrinfo = writer.transport.get_extra_info("peername")
h = addrinfo[0]
p = addrinfo[1]
try:
credentials = self.authenticator(reader, writer)
except AuthenticationError:
self.logger.info("[%s]:%s failed to authenticate, rejecting connection", h, p)
return
else:
self.logger.info("[%s]:%s authenticated successfully", h, p)
else:
credentials = None
try:
self._serve_client(reader, writer, credentials)
except Exception:
self.logger.exception("client connection terminated abruptly")
raise
writer.close()
def _serve_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, credentials):
addrinfo = writer.transport.get_extra_info("peername")
h = addrinfo[0]
p = addrinfo[1]
sockname = writer.transport.get_extra_info("sockname")
sock = writer.transport.get_extra_info("socket")
if credentials:
self.logger.info("welcome [%s]:%s (%r)", h, p, credentials)
else:
self.logger.info("welcome [%s]:%s", h, p)
try:
config = dict(self.protocol_config,
credentials=credentials,
endpoints=(sockname, addrinfo),
logger=self.logger)
conn = Connection(self.service,
Channel(SocketStream(sock)),
config=config,
_lazy=True)
conn._init_service()
conn.serve_all()
finally:
self.logger.info("goodbye [%s]:%s", h, p)
def _bg_register(self):
interval = self.registrar.REREGISTER_INTERVAL
self.logger.info("started background auto-register thread (interval = %s)", interval)
tnext = 0
while True:
t = time.time()
if t >= tnext:
did_register = False
aliases = self.service.get_service_aliases()
try:
did_register = self.registrar.register(aliases, self.port, interface=self.hostname)
except Exception:
self.logger.exception("error registering services")
# If registration worked out, retry to register again after
# interval time. Otherwise, try to register soon again.
if did_register:
tnext = t + interval
else:
self.logger.info("registering services did not work - retry")
time.sleep(1)
def start(self):
"""Starts the server. Use :meth:`close` to stop"""
loop = asyncio.get_event_loop()
coro = asyncio.start_server(self._accept_method, self.hostname, self.port, loop=loop, backlog=self.backlog)
self.server = loop.run_until_complete(coro)
self.logger.info("server started on [%s]:%s", self.hostname, self.port)
if self.auto_register:
t = threading.Thread(target=self._bg_register)
t.setDaemon(True)
t.start()
import rpyc
from caaas_scheduler.scheduler import caaas_sched
from common.status import PlatformStatusReport
from common.state import AlchemySession, Application
class CAaaSSchedulerRPCService(rpyc.Service):
sched = caaas_sched
def on_connect(self):
pass
def on_disconnect(self):
pass
def exposed_get_platform_status(self) -> PlatformStatusReport:
pl_status = self.sched.platform_status.generate_report()
return pl_status
def exposed_terminate_application(self, application_id: int):
state = AlchemySession()
application = state.query(Application).filter_by(id=application_id).one()
self.sched.terminate_application(application)
state.commit()
return True
from caaas_scheduler.platform import PlatformManager
from caaas_scheduler.platform_status import PlatformStatus
from common.configuration import conf
from common.state import Application
from common.application_resources import ApplicationResources
class SimpleSchedulerPolicy:
def __init__(self, platform_status: PlatformStatus):
self.platform_status = platform_status
self.applications_waiting = []
self.applications_running = []
def admission_control(self, application: Application) -> bool:
if application.requirements.core_count() < self.platform_status.swarm_status.cores_total:
return True
else:
return False
def insert(self, application: Application):
self.applications_waiting.append(application)
def runnable(self) -> (Application, ApplicationResources):
try:
app = self.applications_waiting.pop(0)
except IndexError:
return None, None
assigned_resources = app.requirements # Could modify the amount of resource actually used
return app, assigned_resources
def started(self, application: Application):
self.applications_running.append(application)
def terminated(self, application: Application):
if application in self.applications_running:
self.applications_running.remove(application)
if application in self.applications_waiting:
self.applications_waiting.remove(application)
class CAaaSSCheduler:
def __init__(self):
self.platform = PlatformManager()
self.platform_status = PlatformStatus()
self.scheduler_policy = SimpleSchedulerPolicy(self.platform_status)
def init_tasks(self):
self.platform_status.update_task(conf["status_refresh_interval"])
def incoming_application(self, application: Application):
if not self.scheduler_policy.admission_control(application):
return False
self.scheduler_policy.insert(application)
self.check_runnable()
def check_runnable(self):
application, resources = self.scheduler_policy.runnable()
if application is None:
return
self.platform.start_application(application, resources)
self.scheduler_policy.started(application)
def terminate_application(self, application: Application):
self.platform.terminate_application(application)
self.scheduler_policy.terminated(application)
caaas_sched = CAaaSSCheduler()
import docker
import docker.utils
from common.configuration import conf
from caaas_scheduler.swarm_status import SwarmStatus
class SwarmClient:
def __init__(self):
manager = conf['docker_swarm_manager']
self.cli = docker.Client(base_url=manager)
def info(self) -> SwarmStatus:
info = self.cli.info()
pl_status = SwarmStatus()
pl_status.container_count = info["Containers"]
pl_status.image_count = info["Images"]
pl_status.memory_total = info["MemTotal"]
pl_status.cores_total = info["NCPU"]
# DriverStatus is a list...
idx = 1
assert 'Strategy' in info["DriverStatus"][idx][0]
pl_status.placement_strategy = info["DriverStatus"][idx][1]
idx = 2
assert 'Filters' in info["DriverStatus"][idx][0]
pl_status.active_filters = info["DriverStatus"][idx][1].split(", ")
return pl_status
def spawn_container(self, image, options):
host_config = docker.utils.create_host_config(network_mode="bridge",
binds=options.get_volume_binds(),
mem_limit=options.get_memory_limit())
cont = self.cli.create_container(image=image,
environment=options.get_environment(),
network_disabled=False,
host_config=host_config,
detach=True,
volumes=options.get_volumes(),
command=options.get_command())
self.cli.start(container=cont.get('Id'))
return self.inspect_container(cont.get('Id'))
def inspect_container(self, docker_id):
docker_info = self.cli.inspect_container(container=docker_id)
info = {
"docker_ip": docker_info["NetworkSettings"]["IPAddress"]
}
return info
def terminate_container(self, docker_id):
self.cli.remove_container(docker_id, force=True)
class ContainerOptions:
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
self.memory_limit = '2g'
def add_env_variable(self, name, value):
self.env[name] = value
def get_environment(self):
return self.env
def add_volume_bind(self, path, mountpoint, readonly=False):
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + "ro" if readonly else "rw")
def get_volumes(self):
return self.volumes
def get_volume_binds(self):
return self.volume_binds
def set_command(self, cmd):
self.command = cmd
def get_command(self):
return self.command
def set_memory_limit(self, limit):
self.memory_limit = limit
def get_memory_limit(self):
return self.memory_limit
class SwarmNodeStatus:
def __init__(self, name):
self.name = name
self.docker_endpoint = None
self.container_count = 0
self.cores_total = 0
self.cores_reserved = 0
self.memory_total = 0
self.memory_reserved = 0
self.labels = {}
class SwarmStatus:
def __init__(self):
self.container_count = 0
self.image_count = 0
self.memory_total = 0
self.cores_total = 0
self.placement_strategy = ''
self.active_filters = []
self.nodes = []
from flask import Flask
app = Flask(__name__)
import caaas.web
import caaas.api
import caaas_web.web
import caaas_web.api
......@@ -2,10 +2,10 @@ from flask import jsonify, request, send_file, abort
import time
from zipfile import is_zipfile
from caaas import app
from caaas.sql import CAaaState
from caaas.spark_app_execution import application_submitted, setup_volume, AppHistory
from caaas.swarm_manager import sm