swarm_client.py 5.31 KB
Newer Older
1 2 3
import time
import logging

4 5
import docker
import docker.utils
6
import docker.errors
7

8
from common.configuration import zoeconf
9
from zoe_scheduler.swarm_status import SwarmStatus, SwarmNodeStatus
10

11 12
log = logging.getLogger(__name__)

13 14 15

class SwarmClient:
    def __init__(self):
16
        manager = zoeconf.docker_swarm_manager
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
        self.cli = docker.Client(base_url=manager)

    def info(self) -> SwarmStatus:
        info = self.cli.info()
        pl_status = SwarmStatus()
        pl_status.container_count = info["Containers"]
        pl_status.image_count = info["Images"]
        pl_status.memory_total = info["MemTotal"]
        pl_status.cores_total = info["NCPU"]

        # DriverStatus is a list...
        idx = 1
        assert 'Strategy' in info["DriverStatus"][idx][0]
        pl_status.placement_strategy = info["DriverStatus"][idx][1]
        idx = 2
        assert 'Filters' in info["DriverStatus"][idx][0]
33 34 35 36 37 38 39 40 41 42 43 44 45
        pl_status.active_filters = [x.strip() for x in info["DriverStatus"][idx][1].split(", ")]
        idx = 3
        assert 'Nodes' in info["DriverStatus"][idx][0]
        node_count = int(info["DriverStatus"][idx][1])
        idx = 4
        for node in range(node_count):
            ns = SwarmNodeStatus(info["DriverStatus"][idx + node][0])
            ns.docker_endpoint = info["DriverStatus"][idx + node][1]
            ns.container_count = int(info["DriverStatus"][idx + node + 1][1])
            ns.cores_reserved = int(info["DriverStatus"][idx + node + 2][1].split(' / ')[0])
            ns.cores_total = int(info["DriverStatus"][idx + node + 2][1].split(' / ')[1])
            ns.memory_reserved = info["DriverStatus"][idx + node + 3][1].split(' / ')[0]
            ns.memory_total = info["DriverStatus"][idx + node + 3][1].split(' / ')[1]
46
            ns.labels = info["DriverStatus"][idx + node + 4][1:]
47 48 49

            pl_status.nodes.append(ns)
            idx += 4
50
        pl_status.timestamp = time.time()
51 52
        return pl_status

53
    def spawn_container(self, image, options) -> dict:
54
        cont = None
55
        try:
56 57 58 59 60 61 62 63 64 65
            host_config = docker.utils.create_host_config(network_mode="bridge",
                                                          binds=options.get_volume_binds(),
                                                          mem_limit=options.get_memory_limit())
            cont = self.cli.create_container(image=image,
                                             environment=options.get_environment(),
                                             network_disabled=False,
                                             host_config=host_config,
                                             detach=True,
                                             volumes=options.get_volumes(),
                                             command=options.get_command())
66 67
            self.cli.start(container=cont.get('Id'))
        except docker.errors.APIError as e:
68 69
            if cont is not None:
                self.cli.remove_container(container=cont.get('Id'), force=True)
70 71
            log.error(str(e))
            return None
72
        info = self.inspect_container(cont.get('Id'))
73
        return info
74

75 76 77 78 79
    def inspect_container(self, docker_id) -> dict:
        try:
            docker_info = self.cli.inspect_container(container=docker_id)
        except docker.errors.APIError:
            return None
80
        info = {
81 82
            "ip_address": docker_info["NetworkSettings"]["IPAddress"],
            "docker_id": docker_id
83
        }
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
        if docker_info["State"]["Running"]:
            info["state"] = "running"
            info["running"] = True
        elif docker_info["State"]["Paused"]:
            info["state"] = "paused"
            info["running"] = True
        elif docker_info["State"]["Restarting"]:
            info["state"] = "restarting"
            info["running"] = True
        elif docker_info["State"]["OOMKilled"]:
            info["state"] = "killed"
            info["running"] = False
        elif docker_info["State"]["Dead"]:
            info["state"] = "killed"
            info["running"] = False
        else:
            info["state"] = "unknown"
            info["running"] = False
102 103 104 105 106
        return info

    def terminate_container(self, docker_id):
        self.cli.remove_container(docker_id, force=True)

107 108 109 110
    def log_get(self, docker_id) -> str:
        logdata = self.cli.logs(container=docker_id, stdout=True, stderr=True, stream=False, timestamps=False, tail="all")
        return logdata.decode("utf-8")

111 112 113 114 115 116 117 118 119 120

class ContainerOptions:
    def __init__(self):
        self.env = {}
        self.volume_binds = []
        self.volumes = []
        self.command = ""
        self.memory_limit = '2g'

    def add_env_variable(self, name, value):
121 122
        if value is not None:
            self.env[name] = value
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147

    def get_environment(self):
        return self.env

    def add_volume_bind(self, path, mountpoint, readonly=False):
        self.volumes.append(mountpoint)
        self.volume_binds.append(path + ":" + mountpoint + ":" + "ro" if readonly else "rw")

    def get_volumes(self):
        return self.volumes

    def get_volume_binds(self):
        return self.volume_binds

    def set_command(self, cmd):
        self.command = cmd

    def get_command(self):
        return self.command

    def set_memory_limit(self, limit):
        self.memory_limit = limit

    def get_memory_limit(self):
        return self.memory_limit