backend.py 9.93 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
"""Zoe backend implementation for one or more Docker Engines."""
17 18

import logging
19
import re
20
import time
21
from typing import Union
22

23 24
from zoe_lib.config import get_conf
from zoe_lib.state import Service
Daniele Venzano's avatar
Daniele Venzano committed
25
import zoe_master.backends.base
26 27
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig  # pylint: disable=unused-import
28 29 30 31
from zoe_master.backends.docker.threads import DockerStateSynchronizer
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
from zoe_master.metrics.kairosdb import KairosDBInMetrics
32
from zoe_master.stats import ClusterStats, NodeStats
33 34 35

log = logging.getLogger(__name__)

36
# This module-level variable holds the references to the synchro threads
37 38 39 40 41 42 43
_checker = None


class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
    """Zoe backend implementation for old-style stand-alone Docker Swarm."""
    def __init__(self, opts):
        super().__init__(opts)
44
        self.docker_config = DockerConfig(get_conf().backend_docker_config_file).read_config()
45

46
    def _get_config(self, host) -> Union[DockerHostConfig, None]:
47 48 49
        for conf in self.docker_config:
            if conf.name == host:
                return conf
50
        return None
51 52 53 54 55 56 57 58 59 60 61 62 63 64

    @classmethod
    def init(cls, state):
        """Initializes Swarm backend starting the event monitoring thread."""
        global _checker
        _checker = DockerStateSynchronizer(state)

    @classmethod
    def shutdown(cls):
        """Performs a clean shutdown of the resources used by Swarm backend."""
        _checker.quit()

    def spawn_service(self, service_instance: ServiceInstance):
        """Spawn a service, translating a Zoe Service into a Docker container."""
65
        parsed_name = re.search(r'^(?:([^/]+)/)?(?:([^/]+)/)?([^@:/]+)(?:[@:](.+))?$', service_instance.image_name)
Daniele Venzano's avatar
Daniele Venzano committed
66
        if parsed_name.group(4) is None:
67
            raise ZoeStartExecutionFatalException('Image {} does not have a version tag'.format(service_instance.image_name))
68 69 70 71 72 73 74 75 76
        conf = self._get_config(service_instance.backend_host)
        try:
            engine = DockerClient(conf)
            cont_info = engine.spawn_container(service_instance)
        except ZoeNotEnoughResourcesException:
            raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
        except ZoeException as e:
            raise ZoeStartExecutionFatalException(str(e))

77
        return cont_info["id"], cont_info['external_address'], cont_info['ports']
78 79 80 81

    def terminate_service(self, service: Service) -> None:
        """Terminate and delete a container."""
        conf = self._get_config(service.backend_host)
82 83 84 85 86 87
        service.set_terminating()
        try:
            engine = DockerClient(conf)
        except ZoeException as e:
            log.error('Cannot terminate service {}: {}'.format(service.id, str(e)))
            return
88 89 90
        if service.backend_id is not None:
            engine.terminate_container(service.backend_id, delete=True)
        else:
91
            log.error('Cannot terminate service {}, since it has no backend ID'.format(service.name))
92
        service.set_backend_status(service.BACKEND_DESTROY_STATUS)
93

94
    def platform_state(self) -> ClusterStats:
95
        """Get the platform state."""
96 97
        platform_stats = ClusterStats()
        for host_conf in self.docker_config:  # type: DockerHostConfig
98 99 100 101
            try:
                node_stats = _checker.host_stats[host_conf.name]
            except KeyError:
                continue
102 103
            platform_stats.nodes.append(node_stats)

104
        platform_stats.timestamp = time.time()
105 106
        return platform_stats

107
    def _update_node_state(self, host_conf: DockerHostConfig, node_stats: NodeStats, get_usage_stats: bool):
108 109 110 111 112 113 114 115 116 117 118 119
        node_stats.labels = host_conf.labels
        try:
            my_engine = DockerClient(host_conf)
        except ZoeException as e:
            log.error(str(e))
            node_stats.status = 'offline'
            log.info('Node {} is offline'.format(host_conf.name))
            return
        else:
            node_stats.status = 'online'

        try:
120
            container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
121 122 123 124
            info = my_engine.info()
        except ZoeException:
            return

125
        node_stats.container_count = len(container_list)
126 127 128 129 130
        node_stats.cores_total = info['NCPU']
        node_stats.memory_total = info['MemTotal']
        if info['Labels'] is not None:
            node_stats.labels += set(info['Labels'])

131
        node_stats.memory_reserved = sum([cont['memory_hard_limit'] for cont in container_list if cont['memory_hard_limit'] != node_stats.memory_total])
132 133
        node_stats.cores_reserved = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])

134
        stats = {}
135 136 137
        for cont in container_list:
            stats[cont['id']] = {}
            stats[cont['id']]['core_limit'] = cont['cpu_quota'] / cont['cpu_period']
138
            stats[cont['id']]['mem_limit'] = cont['memory_hard_limit']
139
        node_stats.service_stats = stats
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

        if get_usage_stats:
            if get_conf().kairosdb_enable:
                kdb = KairosDBInMetrics()
                for cont in container_list:
                    stats[cont['id']].update(kdb.get_service_usage(cont['name']))

                node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
                node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
            else:
                for cont in container_list:
                    try:
                        aux = my_engine.stats(cont['id'], stream=False)  # this call is very slow (>~1sec)
                        if 'usage' in aux['memory_stats']:
                            stats[cont['id']]['mem_usage'] = aux['memory_stats']['usage']
                        else:
                            stats[cont['id']]['mem_usage'] = 0
                        stats[cont['id']]['cpu_usage'] = self._get_core_usage(aux)
                    except ZoeException:
                        continue

                node_stats.memory_in_use = sum([stat['mem_usage'] for stat in stats.values()])
                node_stats.cores_in_use = sum([stat['cpu_usage'] for stat in stats.values()])
163
        else:
164 165
            node_stats.memory_in_use = 0
            node_stats.cores_in_use = 0
166 167 168 169 170

    def _get_core_usage(self, stat):
        cpu_time_now = stat['cpu_stats']['cpu_usage']['total_usage']
        cpu_time_pre = stat['precpu_stats']['cpu_usage']['total_usage']
        return (cpu_time_now - cpu_time_pre) / 1000000000
171

172 173 174 175
    def node_list(self):
        """Return a list of node names."""
        return [node.name for node in self.docker_config]

176 177 178 179 180
    def service_log(self, service: Service):
        """Get the log."""
        conf = self._get_config(service.backend_host)
        engine = DockerClient(conf)
        return engine.logs(service.backend_id, True, False)
181 182 183

    def preload_image(self, image_name):
        """Pull an image from a Docker registry into each host. We shuffle the list to prevent the scheduler to find always the first host in the list."""
184
        parsed_name = re.search(r'^(?:([^/]+)/)?(?:([^/]+)/)?([^@:/]+)(?:[@:](.+))?$', image_name)
Daniele Venzano's avatar
Daniele Venzano committed
185
        if parsed_name.group(4) is None:
186
            raise ZoeException('Image {} does not have a version tag'.format(image_name))
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
        one_success = False
        for host_conf in self.docker_config:
            log.debug('Pre-loading image {} on host {}'.format(image_name, host_conf.name))
            time_start = time.time()
            my_engine = DockerClient(host_conf)
            try:
                my_engine.pull_image(image_name)
            except ZoeException:
                log.error('Image {} pre-loading failed on host {}'.format(image_name, host_conf.name))
                continue
            else:
                one_success = True
            log.debug('Image {} pre-loaded on host {} in {:.2f}s'.format(image_name, host_conf.name, time.time() - time_start))
        if not one_success:
            raise ZoeException('Cannot pull image {}'.format(image_name))
202

203 204
    def list_available_images(self, node_name):
        """List the images available on the specified node."""
205
        node_stats = _checker.host_stats[node_name]
Daniele Venzano's avatar
Daniele Venzano committed
206 207 208

        if node_stats.status == 'offline':
            return []
209
        return node_stats.images
210

211 212 213
    def update_service(self, service, cores=None, memory=None):
        """Update a service reservation."""
        conf = self._get_config(service.backend_host)
214 215 216 217 218
        try:
            engine = DockerClient(conf)
        except ZoeException as e:
            log.error(str(e))
            return
219 220 221 222 223 224
        if service.backend_id is not None:
            info = engine.info()
            if cores is not None and cores > info['NCPU']:
                cores = info['NCPU']
            if memory is not None and memory > info['MemTotal']:
                memory = info['MemTotal']
225
            cpu_quota = int(cores * 100000)
226 227
            engine.update(service.backend_id, cpu_quota=cpu_quota, mem_reservation=memory)
        else:
228
            log.error('Cannot update reservations for service {} ({}), since it has no backend ID'.format(service.name, service.id))