Commit e96e1d18 authored by Daniele Venzano's avatar Daniele Venzano

Now it is able to start an empty spark cluster from the CLI client

parent 873146f5
......@@ -57,5 +57,69 @@
<identifier-quote-string>`</identifier-quote-string>
</database-info>
<case-sensitivity plain-identifiers="exact" quoted-identifiers="exact"/>
<schema name="" catalog="caaas_devel"/>
<table name="applications" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="name" sqlType="VARCHAR" precision="64" scale="0" nullable="true" jdbcType="12"/>
<column name="required_resources" sqlType="BLOB" precision="65535" scale="0" nullable="true" jdbcType="-4"/>
<column name="user_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="type" sqlType="VARCHAR" precision="20" scale="0" nullable="true" jdbcType="12"/>
<column name="master_image" sqlType="VARCHAR" precision="256" scale="0" nullable="true" jdbcType="12"/>
<column name="worker_image" sqlType="VARCHAR" precision="256" scale="0" nullable="true" jdbcType="12"/>
<primary-key name="PRIMARY" columns="id"/>
<foreign-key name="applications_ibfk_1" columns="user_id" ref-table="users" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
<exported-key name="executions_ibfk_1" table="executions" schema="" catalog="caaas_devel" columns="application_id"/>
</table>
<table name="clusters" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="execution_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<primary-key name="PRIMARY" columns="id"/>
<foreign-key name="clusters_ibfk_1" columns="execution_id" ref-table="executions" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
<exported-key name="containers_ibfk_1" table="containers" schema="" catalog="caaas_devel" columns="cluster_id"/>
<exported-key name="proxies_ibfk_1" table="proxies" schema="" catalog="caaas_devel" columns="cluster_id"/>
</table>
<table name="containers" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="docker_id" sqlType="VARCHAR" precision="128" scale="0" nullable="true" jdbcType="12"/>
<column name="cluster_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="ip_address" sqlType="VARCHAR" precision="16" scale="0" nullable="true" jdbcType="12"/>
<column name="readable_name" sqlType="VARCHAR" precision="32" scale="0" nullable="true" jdbcType="12"/>
<primary-key name="PRIMARY" columns="id"/>
<foreign-key name="containers_ibfk_1" columns="cluster_id" ref-table="clusters" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
<exported-key name="proxies_ibfk_2" table="proxies" schema="" catalog="caaas_devel" columns="container_id"/>
</table>
<table name="executions" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="name" sqlType="VARCHAR" precision="64" scale="0" nullable="true" jdbcType="12"/>
<column name="assigned_resources" sqlType="BLOB" precision="65535" scale="0" nullable="true" jdbcType="-4"/>
<column name="application_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="time_scheduled" sqlType="DATETIME" precision="19" scale="0" nullable="true" jdbcType="93"/>
<column name="time_started" sqlType="DATETIME" precision="19" scale="0" nullable="true" jdbcType="93"/>
<column name="time_finished" sqlType="DATETIME" precision="19" scale="0" nullable="true" jdbcType="93"/>
<column name="status" sqlType="VARCHAR" precision="32" scale="0" nullable="true" jdbcType="12"/>
<column name="type" sqlType="VARCHAR" precision="20" scale="0" nullable="true" jdbcType="12"/>
<column name="commandline" sqlType="VARCHAR" precision="1024" scale="0" nullable="true" jdbcType="12"/>
<column name="spark_opts" sqlType="VARCHAR" precision="1024" scale="0" nullable="true" jdbcType="12"/>
<primary-key name="PRIMARY" columns="id"/>
<foreign-key name="executions_ibfk_1" columns="application_id" ref-table="applications" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
<exported-key name="clusters_ibfk_1" table="clusters" schema="" catalog="caaas_devel" columns="execution_id"/>
</table>
<table name="proxies" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="internal_url" sqlType="VARCHAR" precision="1024" scale="0" nullable="true" jdbcType="12"/>
<column name="cluster_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="container_id" sqlType="INT" precision="10" scale="0" nullable="true" jdbcType="4"/>
<column name="service_name" sqlType="VARCHAR" precision="32" scale="0" nullable="true" jdbcType="12"/>
<column name="last_access" sqlType="DATETIME" precision="19" scale="0" nullable="true" jdbcType="93"/>
<primary-key name="PRIMARY" columns="id"/>
<foreign-key name="proxies_ibfk_1" columns="cluster_id" ref-table="clusters" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
<foreign-key name="proxies_ibfk_2" columns="container_id" ref-table="containers" ref-schema="" ref-catalog="caaas_devel" ref-columns="id" update-rule="3" delete-rule="3" deferrability="2"/>
</table>
<table name="users" schema="" catalog="caaas_devel" type="TABLE">
<column name="id" sqlType="INT" precision="10" scale="0" nullable="false" jdbcType="4" autoIncrement="true"/>
<column name="email" sqlType="VARCHAR" precision="128" scale="0" nullable="true" jdbcType="12"/>
<primary-key name="PRIMARY" columns="id"/>
<exported-key name="applications_ibfk_1" table="applications" schema="" catalog="caaas_devel" columns="user_id"/>
</table>
</data-source>
</component>
\ No newline at end of file
......@@ -10,8 +10,8 @@
<data-source name="MySQL - @m1 devel" uuid="33b1ec79-4374-4ff8-a8f8-26c89b418a79">
<secret-storage>master_key</secret-storage>
<user-name>caaas_devel</user-name>
<schema-pattern>caaas.*</schema-pattern>
<default-schemas>caaas.*</default-schemas>
<schema-pattern>caaas_devel.*</schema-pattern>
<default-schemas>caaas_devel.*</default-schemas>
</data-source>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" hash="270082280">
<component name="DataSourceManagerImpl" format="xml" hash="330654499">
<data-source source="LOCAL" name="MySQL - @m1" uuid="a32fd6de-3ffa-40c0-9ec8-8953a89c53e0">
<driver-ref>mysql</driver-ref>
<synchronize>true</synchronize>
......
This diff is collapsed.
......@@ -4,7 +4,7 @@ import signal
from caaas_scheduler.rpyc_service import CAaaSSchedulerRPCService
from caaas_scheduler.rpyc_server import RPyCAsyncIOServer
from caaas_scheduler.scheduler import CAaaSSCheduler
from caaas_scheduler.scheduler import caaas_sched
def sigint_handler():
......@@ -22,7 +22,7 @@ if __name__ == "__main__":
rpyc_server = RPyCAsyncIOServer(CAaaSSchedulerRPCService, '0.0.0.0', port=4000, auto_register=True)
rpyc_server.start()
CAaaSSCheduler().init_tasks()
caaas_sched.init_tasks()
try:
loop.run_forever()
......
[docker]
swarm-manager = tcp://127.0.0.1:2380
volume-path = /mnt/cephfs/caaas
[db]
user = caaas
pass = changeme
server = 127.0.0.1
db = caaas
[proxy]
base_url = http://some-host/some-path
apache_config = /tmp/caaas-proxy.conf
apache_access_log = /var/log/apache2/access.log
[caaas]
history_per_user_count = 20
history_path = /var/cache/caaas/app_history
cleanup_thread_interval = 5
base_flask_url = http://bigfoot-m2.eurecom.fr
cleanup_notebooks_older_than = 24
cleanup_notebooks_warning = 22
[smtp]
user = user@gmail.com
pass = pass
server = smtp.gmail.com:25
import rpyc
from sqlalchemy.orm.exc import NoResultFound
from common.state import AlchemySession, SparkApplication, User
from common.state import AlchemySession, SparkApplication, User, Application, Cluster, SparkSubmitExecution, Execution
from common.application_resources import SparkApplicationResources
from common.status import PlatformStatusReport
from common.exceptions import UserIDDoesNotExist
from common.exceptions import UserIDDoesNotExist, ApplicationStillRunning
REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
......@@ -42,8 +42,48 @@ class CAaaSClient:
resources = SparkApplicationResources()
resources.worker_count = worker_count
resources.worker_resources["memory_limit"] = executor_memory
resources.worker_resources["worker_cores"] = str(executor_cores)
app = SparkApplication(master_image=MASTER_IMAGE, worker_image=WORKER_IMAGE, name='empty-cluster', required_resources=resources)
resources.worker_resources["cores"] = executor_cores
app = SparkApplication(master_image=MASTER_IMAGE, worker_image=WORKER_IMAGE, name='empty-cluster', required_resources=resources, user_id=user_id)
self.state.add(app)
self.state.commit()
return app
def spark_application_get(self, application_id):
try:
return self.state.query(SparkApplication).filter_by(id=application_id).one()
except NoResultFound:
return None
def execution_spark_submit_new(self, application: Application, name, commandline, spark_options):
execution = SparkSubmitExecution(name=name,
application_id=application.id,
status="submitted",
commandline=commandline,
spark_opts=spark_options)
self.state.add(execution)
self.state.commit()
return self.server.execution_schedule(execution.id)
def execution_spark_cluster_new(self, application: Application, name):
execution = Execution(name=name,
application_id=application.id,
status="submitted")
self.state.add(execution)
self.state.commit()
return self.server.execution_schedule(execution.id)
def application_remove(self, application: Application):
running = self.state.query(Cluster).filter_by(app_id=application.id).count()
if running > 0:
raise ApplicationStillRunning(application)
self.state.delete(application)
self.state.commit()
def application_status(self, application: Application):
return self.server.application_status(application.id)
def execution_get(self, execution_id: int) -> Execution:
return self.state.query(Execution).filter_by(id=execution_id).one()
def execution_terminate(self, execution: Execution):
self.server.terminate_execution(execution.id)
from caaas_scheduler.swarm_client import SwarmClient, ContainerOptions
from common.state import AlchemySession, Cluster, Container, Application, SparkApplication
from common.state import AlchemySession, Cluster, Container, Application, SparkApplication, Proxy, Execution
from common.application_resources import ApplicationResources, SparkApplicationResources
from common.exceptions import CannotCreateCluster
class PlatformManager:
def __init__(self):
self.swarm = SwarmClient()
def start_application(self, application: Application, resources: ApplicationResources):
def start_execution(self, execution_id: int, resources: ApplicationResources) -> bool:
state = AlchemySession()
self._application_to_containers(state, application, resources)
execution = state.query(Execution).filter_by(id=execution_id).one()
execution.assigned_resources = resources
self._application_to_containers(state, execution)
execution.set_started()
state.commit()
return True
# noinspection PyTypeChecker
def _application_to_containers(self, state, application: Application, resources: ApplicationResources):
if type(application) is SparkApplication:
self._spark_app_to_containers(state, application, resources)
def _application_to_containers(self, state, execution: Execution):
if type(execution.application) is SparkApplication:
self._spark_app_to_containers(state, execution)
else:
raise NotImplementedError('%s application are not implemented' % type(application))
raise NotImplementedError('%s application are not implemented' % type(execution.application))
def _spark_app_to_containers(self, state: AlchemySession, application: SparkApplication, resources: SparkApplicationResources):
cluster = Cluster(app_id=application.id)
def _spark_app_to_containers(self, state: AlchemySession, execution: Execution):
application = execution.application
resources = execution.assigned_resources
cluster = Cluster(execution_id=execution.id)
state.add(cluster)
# Master
master_requirements = resources.master_resources
......@@ -30,32 +37,49 @@ class PlatformManager:
master_opts.set_memory_limit(master_requirements["memory_limit"])
image = application.master_image
master_info = self.swarm.spawn_container(image, master_opts)
container = Container(docker_id=master_info["docker_id"], ip_address=master_info["ip_address"], readable_name="spark-master")
container.cluster = cluster
if master_info is None:
raise CannotCreateCluster(application)
container = Container(docker_id=master_info["docker_id"], ip_address=master_info["ip_address"], readable_name="spark-master", cluster=cluster)
state.add(container)
master_web_url = "http://" + master_info["ip_address"] + ":8080"
master_proxy = Proxy(service_name="Spark master web interface", container=container, cluster=cluster, internal_url=master_web_url)
state.add(master_proxy)
# Workers
worker_requirements = resources.worker_resources
worker_opts = ContainerOptions()
worker_opts.add_env_variable("SPARK_MASTER_IP", master_info["docker_ip"])
worker_opts.add_env_variable("SPARK_MASTER_IP", master_info["ip_address"])
if "memory_limit" in worker_requirements:
worker_opts.add_env_variable("SPARK_WORKER_RAM", worker_requirements["memory_limit"])
worker_opts.set_memory_limit(worker_requirements["memory_limit"])
if "worker_cores" in worker_requirements:
worker_opts.add_env_variable("SPARK_WORKER_CORES", worker_requirements["worker_cores"])
if "cores" in worker_requirements:
worker_opts.add_env_variable("SPARK_WORKER_CORES", worker_requirements["cores"])
image = application.worker_image
workers_docker_id = []
for i in range(resources.worker_count):
worker_info = self.swarm.spawn_container(image, worker_opts)
if worker_info is None:
self.swarm.terminate_container(master_info["docker_id"])
for j in range(i):
self.swarm.terminate_container(workers_docker_id[j])
raise CannotCreateCluster(application)
workers_docker_id.append(worker_info["docker_id"])
container = Container(docker_id=worker_info["docker_id"], ip_address=worker_info["ip_address"], readable_name="spark-worker-%d" % i)
container.cluster = cluster
state.add(container)
worker_web_url = "http://" + worker_info["ip_address"] + ":8081"
worker_proxy = Proxy(service_name="Spark worker web interface", container=container, cluster=cluster, internal_url=worker_web_url)
state.add(worker_proxy)
def terminate_application(self, application: Application):
def terminate_execution(self, execution: Execution):
state = AlchemySession()
cluster = state.query(Cluster).filter_by(app_id=application.id)
cluster = state.query(Cluster).filter_by(execution_id=execution.id).one()
containers = cluster.containers
for c in containers:
self.swarm.terminate_container(c.docker_id)
state.delete(c)
for p in cluster.proxies:
state.delete(p)
state.delete(cluster)
execution.set_terminated()
state.commit()
......@@ -10,12 +10,10 @@ from common.status import PlatformStatusReport
class PlatformStatus:
def __init__(self):
self.swarm_status = SwarmStatus()
self.swarm_status_timestamp = time.time()
self.swarm = SwarmClient()
def update(self):
self.swarm_status = self.swarm.info()
self.swarm_status_timestamp = time.time()
def update_task(self, interval):
self.update()
......@@ -23,4 +21,5 @@ class PlatformStatus:
def generate_report(self) -> PlatformStatusReport:
report = PlatformStatusReport()
report.include_swarm_status(self.swarm_status)
return report
import asyncio
import logging
import time
import threading
from rpyc.utils.server import UDPRegistryClient, AuthenticationError, Connection, Channel, SocketStream
from caaas_scheduler.periodic_tasks import periodic_task
class RPyCAsyncIOServer:
"""AsyncIO RpyC server implementation
......@@ -130,27 +131,11 @@ class RPyCAsyncIOServer:
self.logger.info("goodbye [%s]:%s", h, p)
def _bg_register(self):
interval = self.registrar.REREGISTER_INTERVAL
self.logger.info("started background auto-register thread (interval = %s)", interval)
tnext = 0
while True:
t = time.time()
if t >= tnext:
did_register = False
aliases = self.service.get_service_aliases()
try:
did_register = self.registrar.register(aliases, self.port, interface=self.hostname)
except Exception:
self.logger.exception("error registering services")
# If registration worked out, retry to register again after
# interval time. Otherwise, try to register soon again.
if did_register:
tnext = t + interval
else:
self.logger.info("registering services did not work - retry")
time.sleep(1)
aliases = self.service.get_service_aliases()
try:
self.registrar.register(aliases, self.port, interface=self.hostname)
except:
self.logger.exception("error registering services")
def start(self):
"""Starts the server. Use :meth:`close` to stop"""
......@@ -160,8 +145,5 @@ class RPyCAsyncIOServer:
self.logger.info("server started on [%s]:%s", self.hostname, self.port)
if self.auto_register:
t = threading.Thread(target=self._bg_register)
t.setDaemon(True)
t.start()
self._bg_register()
periodic_task(self._bg_register, self.registrar.REREGISTER_INTERVAL)
......@@ -2,8 +2,8 @@ import rpyc
from caaas_scheduler.scheduler import caaas_sched
from common.status import PlatformStatusReport
from common.state import AlchemySession, Application
from common.status import PlatformStatusReport, ApplicationStatusReport
from common.state import AlchemySession, Application, Execution
class CAaaSSchedulerRPCService(rpyc.Service):
......@@ -19,9 +19,26 @@ class CAaaSSchedulerRPCService(rpyc.Service):
pl_status = self.sched.platform_status.generate_report()
return pl_status
def exposed_terminate_application(self, application_id: int):
def exposed_terminate_execution(self, execution_id: int) -> bool:
state = AlchemySession()
application = state.query(Application).filter_by(id=application_id).one()
self.sched.terminate_application(application)
execution = state.query(Execution).filter_by(id=execution_id).one()
self.sched.terminate_execution(execution)
state.commit()
return True
def exposed_execution_schedule(self, execution_id: int) -> bool:
state = AlchemySession()
execution = state.query(Execution).filter_by(id=execution_id).one()
ret = self.sched.incoming(execution)
if ret:
execution.set_scheduled()
state.commit()
return ret
def exposed_application_status(self, application_id: int):
state = AlchemySession()
application = state.query(Application).filter_by(id=application_id).one()
report = ApplicationStatusReport(application)
for e in application.executions:
report.add_execution(e)
return report
import asyncio
from caaas_scheduler.platform import PlatformManager
from caaas_scheduler.platform_status import PlatformStatus
from caaas_scheduler.periodic_tasks import periodic_task
from common.configuration import conf
from common.state import Application
from common.state import Application, Execution
from common.application_resources import ApplicationResources
class SimpleSchedulerPolicy:
def __init__(self, platform_status: PlatformStatus):
self.platform_status = platform_status
self.applications_waiting = []
self.applications_running = []
self.waiting_list = []
self.running_list = []
def admission_control(self, application: Application) -> bool:
if application.requirements.core_count() < self.platform_status.swarm_status.cores_total:
def admission_control(self, required_resources: ApplicationResources) -> bool:
if required_resources.core_count() < self.platform_status.swarm_status.cores_total:
return True
else:
return False
def insert(self, application: Application):
self.applications_waiting.append(application)
def insert(self, execution_id: int, resources: ApplicationResources):
self.waiting_list.append((execution_id, resources))
def runnable(self) -> (Application, ApplicationResources):
def runnable(self) -> (int, ApplicationResources):
try:
app = self.applications_waiting.pop(0)
exec_id, resources = self.waiting_list.pop(0)
except IndexError:
return None, None
assigned_resources = app.requirements # Could modify the amount of resource actually used
return app, assigned_resources
assigned_resources = resources # Could modify the amount of resource assigned before running
return exec_id, assigned_resources
def started(self, execution_id: int, resources: ApplicationResources):
self.running_list.append((execution_id, resources))
def terminated(self, execution_id: int):
if self.find_execution_running(execution_id):
self.running_list = [x for x in self.running_list if x[0] != execution_id]
if self.find_execution_waiting(execution_id):
self.waiting_list = [x for x in self.waiting_list if x[0] != execution_id]
def started(self, application: Application):
self.applications_running.append(application)
def find_execution_running(self, exec_id) -> bool:
for e, r in self.running_list:
if e == exec_id:
return True
return False
def terminated(self, application: Application):
if application in self.applications_running:
self.applications_running.remove(application)
if application in self.applications_waiting:
self.applications_waiting.remove(application)
def find_execution_waiting(self, exec_id) -> bool:
for e, r in self.waiting_list:
if e == exec_id:
return True
else:
return False
class CAaaSSCheduler:
......@@ -48,24 +64,29 @@ class CAaaSSCheduler:
def init_tasks(self):
self.platform_status.update_task(conf["status_refresh_interval"])
periodic_task(self.schedule, conf['scheduler_task_interval'])
def incoming_application(self, application: Application):
if not self.scheduler_policy.admission_control(application):
def incoming(self, execution: Execution) -> bool:
if not self.scheduler_policy.admission_control(execution.application.required_resources):
return False
self.scheduler_policy.insert(application)
self.check_runnable()
self.scheduler_policy.insert(execution.id, execution.application.required_resources)
asyncio.get_event_loop().call_soon(self._check_runnable)
return True
def check_runnable(self):
application, resources = self.scheduler_policy.runnable()
if application is None:
def _check_runnable(self): # called periodically, does not use state to keep database load low
execution_id, resources = self.scheduler_policy.runnable()
if execution_id is None:
return
self.platform.start_application(application, resources)
self.scheduler_policy.started(application)
if self.platform.start_execution(execution_id, resources):
self.scheduler_policy.started(execution_id, resources)
def schedule(self):
self._check_runnable()
def terminate_application(self, application: Application):
self.platform.terminate_application(application)
self.scheduler_policy.terminated(application)
def terminate_execution(self, execution: Execution):
self.platform.terminate_execution(execution)
self.scheduler_policy.terminated(execution.id)
caaas_sched = CAaaSSCheduler()
import time
import logging
log = logging.getLogger(__name__)
import docker
import docker.utils
import docker.errors
from common.configuration import conf
......@@ -27,9 +32,10 @@ class SwarmClient:
assert 'Filters' in info["DriverStatus"][idx][0]
pl_status.active_filters = info["DriverStatus"][idx][1].split(", ")
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, image, options):
def spawn_container(self, image, options) -> dict:
host_config = docker.utils.create_host_config(network_mode="bridge",
binds=options.get_volume_binds(),
mem_limit=options.get_memory_limit())
......@@ -40,13 +46,23 @@ class SwarmClient:
detach=True,
volumes=options.get_volumes(),
command=options.get_command())
self.cli.start(container=cont.get('Id'))
return self.inspect_container(cont.get('Id'))
try:
self.cli.start(container=cont.get('Id'))
info = self.inspect_container(cont.get('Id'))
except docker.errors.APIError as e:
self.cli.remove_container(container=cont.get('Id'), force=True)
log.error(str(e))
return None
return info
def inspect_container(self, docker_id):
docker_info = self.cli.inspect_container(container=docker_id)
def inspect_container(self, docker_id) -> dict:
try:
docker_info = self.cli.inspect_container(container=docker_id)
except docker.errors.APIError:
return None
info = {
"docker_ip": docker_info["NetworkSettings"]["IPAddress"]
"ip_address": docker_info["NetworkSettings"]["IPAddress"],
"docker_id": docker_id
}
return info
......
import time
class SwarmNodeStatus:
def __init__(self, name):
self.name = name
......@@ -19,3 +22,4 @@ class SwarmStatus:
self.placement_strategy = ''
self.active_filters = []
self.nodes = []
self.timestamp = time.time()
......@@ -30,7 +30,61 @@ def user_get_cmd(args):
def spark_app_new_cmd(args):
client = CAaaSClient()
application = client.spark_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores)
client.schedule_application(application)
print("Spark application added with ID: {}".format(application.id))
def run_spark_submit_cmd(args):
client = CAaaSClient()
application = client.spark_application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
ret = client.execution_spark_submit_new(application, args.name, args.cmd, args.spark_opts)
if ret:
print("Application scheduled successfully, use the app-inspect command to check its status")
else:
print("Admission control refused to run the application specified")
def run_empty_cmd(args):
client = CAaaSClient()
application = client.spark_application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
ret = client.execution_spark_cluster_new(application, args.name)
if ret:
print("Application scheduled successfully, use the app-inspect command to check its status")
else:
print("Admission control refused to run the application specified")
def app_rm_cmd(args):
client = CAaaSClient()
application = client.spark_application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
client.application_remove(application)
def app_inspect_cmd(args):
client = CAaaSClient()
application = client.spark_application_get(args.id)
if application is None:
print("Error: application {} does not exist".format(args.id))
return
app_report = client.application_status(application)
print(app_report)
def exec_kill_cmd(args):
client = CAaaSClient()
execution = client.execution_get(args.id)
if execution is None:
print("Error: execution {} does not exist".format(args.id))
return
client.execution_terminate(execution)
def process_arguments() -> Namespace:
......@@ -42,22 +96,48 @@ def process_arguments() -> Namespace:
argparser_status.set_defaults(func=status_cmd)
argparser_user_new = subparser.add_parser('user-new', help="Create a new user")
argparser_user_new.add_argument('--email', required=True, help="User email address")
argparser_user_new.add_argument('email', help="User email address")
argparser_user_new.set_defaults(func=user_new_cmd)
argparser_user_get = subparser.add_parser('user-get', help="Get the user id for an existing user")
argparser_user_get.add_argument('--email', required=True, help="User email address")
argparser_user_get.add_argument('email', help="User email address")
argparser_user_get.set_defaults(func=user_get_cmd)
argparser_setup_db = subparser.add_parser('setup-db', help="Create the tables in the database")
argparser_setup_db.set_defaults(func=setup_db_cmd)
argparser_spark_cluster_create = subparser.add_parser('spark-app-new', help="Create an empty Spark cluster")
argparser_spark_cluster_create.set_defaults(func=spark_app_new_cmd)
argparser_spark_cluster_create = subparser.add_parser('spark-app-new', help="Setup a new Spark submit application")
argparser_spark_cluster_create.add_argument('--user-id', type=int, required=True, help='Application owner')
argparser_spark_cluster_create.add_argument('--name', required=True, help='Application name')