swarm_manager.py 5.75 KB
Newer Older
1
from docker import Client
2 3
from docker import errors as docker_errors
from docker.utils import create_host_config
4
import time
5 6
from uuid import uuid4 as uuid

Daniele Venzano's avatar
Daniele Venzano committed
7 8 9
from caaas import CAaaState
from caaas import SparkClusterDescription
from utils import config
10 11 12 13 14 15 16 17 18 19 20

REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
WORKER_IMAGE = REGISTRY + "/venza/spark-worker:1.4.1"
SHELL_IMAGE = REGISTRY + "/venza/spark-shell:1.4.1"
SUBMIT_IMAGE = REGISTRY + "/venza/spark-submit:1.4.1"
CONTAINER_IMAGE = REGISTRY + "/venza/spark-notebook:1.4.1"


def get_uuid():
    return str(uuid())
21 22 23 24 25 26 27 28


class SwarmStatus:
    def __init__(self):
        self.num_nodes = 0
        self.num_containers = 0


Daniele Venzano's avatar
Daniele Venzano committed
29
class SwarmManager:
30 31 32
    def __init__(self):
        self.status = SwarmStatus()
        self.cli = None
Daniele Venzano's avatar
Daniele Venzano committed
33
        self.last_update_timestamp = 0
34

Daniele Venzano's avatar
Daniele Venzano committed
35 36 37
    def connect(self):
        manager = config.get_swarm_manager_address()
        self.cli = Client(base_url=manager)
38 39 40 41 42 43

    def update_status(self):
        assert self.cli is not None
        info = self.cli.info()
        self.status.num_containers = info["Containers"]
        self.status.num_nodes = info["DriverStatus"][3][1]
Daniele Venzano's avatar
Daniele Venzano committed
44
        self.last_update_timestamp = time.time()
45

46
    def get_notebook(self, user_id):
Daniele Venzano's avatar
Daniele Venzano committed
47
        db = CAaaState()
48 49 50 51 52 53 54
        nb = db.get_notebook(user_id)
        if nb is None:
            self.start_cluster_with_notebook(user_id)
            nb = db.get_notebook(user_id)
        return nb["address"]

    def start_cluster_with_notebook(self, user_id):
55
        cluster_descr = SparkClusterDescription()
Daniele Venzano's avatar
Daniele Venzano committed
56
        cluster_descr.for_spark_notebook()
57
        self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
58

59
    def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook):
Daniele Venzano's avatar
Daniele Venzano committed
60
        db = CAaaState()
61 62
        try:
            cluster_id = db.new_cluster(user_id, name)
63
            master_info = self._spawn_spark_master(cluster_id, user_id, cluster_descr)
64
            db.set_master_address(cluster_id, master_info["spark_master_address"])
65 66
            for i in range(cluster_descr.num_workers):
                self._spawn_spark_worker(cluster_id, user_id, cluster_descr, master_info)
67
            if with_notebook:
68
                self._spawn_spark_notebook(cluster_id, user_id, cluster_descr, master_info)
69 70 71 72
        except docker_errors.APIError as e:
            print("Error starting container: " + str(e.explanation))
            # FIXME: should rollback all changes to DB

73
    def _spawn_spark_master(self, cluster_id, user_id, cluster_descr):
Daniele Venzano's avatar
Daniele Venzano committed
74
        db = CAaaState()
75 76 77 78 79 80 81 82 83
        options = {
            "environment": {},
        }
        info = self._spawn_container(MASTER_IMAGE, options)
        info["spark_master_address"] = "http://" + info["docker_ip"] + ":8080"
        cont_id = db.new_container(cluster_id, user_id, info["docker_id"], info["docker_ip"], "spark-master")
        db.new_proxy_entry(get_uuid(), cluster_id, info["spark_master_address"], "spark-master", cont_id)
        return info

84
    def _spawn_spark_worker(self, cluster_id, user_id, cluster_descr, master_info):
Daniele Venzano's avatar
Daniele Venzano committed
85
        db = CAaaState()
86 87 88
        options = {
            "environment": {
                "SPARK_MASTER_IP": master_info["docker_ip"],
89 90
                "SPARK_WORKER_RAM": cluster_descr.executor_ram_size,
                "SPARK_WORKER_CORES": cluster_descr.worker_cores
91 92 93 94 95 96 97
            },
        }
        info = self._spawn_container(WORKER_IMAGE, options)
        cont_id = db.new_container(cluster_id, user_id, info["docker_id"], info["docker_ip"], "spark-worker")
        db.new_proxy_entry(get_uuid(), cluster_id, "http://" + info["docker_ip"] + ":8081", "spark-worker", cont_id)
        return info

98
    def _spawn_spark_notebook(self, cluster_id, user_id, cluster_descr, master_info):
Daniele Venzano's avatar
Daniele Venzano committed
99
        db = CAaaState()
100 101 102 103
        proxy_id = get_uuid()
        options = {
            "environment": {
                "SPARK_MASTER_IP": master_info["docker_ip"],
104
                "SPARK_EXECUTOR_RAM": cluster_descr.executor_ram_size,
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
                "PROXY_ID": proxy_id
            },
        }
        info = self._spawn_container(CONTAINER_IMAGE, options)
        cont_id = db.new_container(cluster_id, user_id, info["docker_id"], info["docker_ip"], "spark-notebook")
        db.new_proxy_entry(proxy_id, cluster_id, "http://" + info["docker_ip"] + ":9000/proxy/" + proxy_id, "spark-notebook", cont_id)
        db.new_notebook(cluster_id, "http://bigfoot-m2.eurecom.fr/proxy/" + proxy_id, user_id, cont_id)
        return info

    def _spawn_container(self, image, options):
        host_config = create_host_config(network_mode="bridge")
        cont = self.cli.create_container(image=image,
                                         environment=options["environment"],
                                         network_disabled=False,
                                         host_config=host_config,
                                         detach=True)
        self.cli.start(container=cont.get('Id'))
        docker_info = self.cli.inspect_container(container=cont.get('Id'))
        info = {
            "docker_id": cont.get("Id"),
            "docker_ip": docker_info["NetworkSettings"]["IPAddress"]
        }
        return info

129
    def _terminate_container(self, container_id, docker_id, contents):
Daniele Venzano's avatar
Daniele Venzano committed
130
        db = CAaaState()
131 132 133 134 135 136 137
        db.remove_proxy(container_id)
        if contents == "spark-notebook":
            db.remove_notebook(container_id)
        self.cli.remove_container(docker_id, force=True)
        db.remove_container(container_id)

    def terminate_cluster(self, cluster_id):
Daniele Venzano's avatar
Daniele Venzano committed
138
        db = CAaaState()
139 140 141 142 143 144
        cont_list = db.get_containers(cluster_id=cluster_id)
        for cid, cinfo in cont_list.items():
            self._terminate_container(cid, cinfo["docker_id"], cinfo["contents"])
        db.remove_cluster(cluster_id)
        return True