[GITLAB] - UPGRADE TO v12 on Wednesday the 18th of December at 11.30AM

api_client.py 12 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Docker API."""

import logging
from typing import Iterable, Callable, Dict, Any

import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers

import requests.exceptions

from zoe_lib.config import get_conf
from zoe_lib.state import Service, VolumeDescriptionHostPath
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.config import DockerHostConfig  # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException

log = logging.getLogger(__name__)

try:
    docker.DockerClient()
except AttributeError:
    log.error('Docker package does not have the DockerClient attribute')
    raise ImportError('Wrong Docker library version')


class DockerClient:
    """The client class that wraps the Docker API."""
46
    def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None:
47
        self.name = docker_config.name
48
        self.docker_config = docker_config
49 50 51 52 53
        if not docker_config.tls:
            tls = None
        else:
            tls = docker.tls.TLSConfig(client_cert=(docker_config.tls_cert, docker_config.tls_key), verify=docker_config.tls_ca)

54 55 56 57 58
        # Simplify testing
        if mock_client is not None:
            self.cli = mock_client
            return

59 60 61 62 63 64 65 66 67 68 69
        try:
            self.cli = docker.DockerClient(base_url=docker_config.address, version="auto", tls=tls)
        except docker.errors.DockerException as e:
            raise ZoeException("Cannot connect to Docker host {} at address {}: {}".format(docker_config.name, docker_config.address, str(e)))

    def info(self) -> Dict:
        """Retrieve engine statistics."""
        return self.cli.info()

    def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
        """Create and start a new container."""
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
        run_args = {
            'detach': True,
            'ports': {},
            'environment': {},
            'volumes': {},
            'working_dir': service_instance.work_dir,
            'mem_limit': 0,
            'mem_reservation': 0,
            'memswap_limit': 0,
            'name': service_instance.name,
            'network_disabled': False,
            'network_mode': get_conf().overlay_network_name,
            'image': service_instance.image_name,
            'command': service_instance.command,
            'hostname': service_instance.hostname,
            'labels': service_instance.labels,
            'cpu_period': 100000,
            'cpu_quota': 100000,
            'log_config': {
                "type": "json-file",
                "config": {}
            }
        }
93
        for port in service_instance.ports:
94
            run_args['ports'][str(port.number) + '/' + port.proto] = None
95 96

        for name, value in service_instance.environment:
97
            run_args['environment'][name] = value
98 99 100 101

        for volume in service_instance.volumes:
            if volume.type == "host_directory":
                assert isinstance(volume, VolumeDescriptionHostPath)
102
                run_args['volumes'][volume.path] = {'bind': volume.mount_point, 'mode': ("ro" if volume.readonly else "rw")}
103 104 105 106
            else:
                log.error('Swarm backend does not support volume type {}'.format(volume.type))

        if service_instance.memory_limit is not None:
107 108 109 110
            run_args['mem_limit'] = service_instance.memory_limit.max
            run_args['mem_reservation'] = service_instance.memory_limit.min
            if service_instance.memory_limit.max == service_instance.memory_limit.min:
                run_args['mem_reservation'] -= 1
111 112

        if service_instance.core_limit is not None:
113
            run_args['cpu_quota'] = int(100000 * service_instance.core_limit.max)
114 115

        if get_conf().gelf_address != '':
116
            run_args['log_config'] = {
117 118 119 120 121 122 123
                "type": "gelf",
                "config": {
                    'gelf-address': get_conf().gelf_address,
                    'labels': ",".join(service_instance.labels)
                }
            }

124
        cont = None
125
        try:
126
            cont = self.cli.containers.run(**run_args)
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
        except docker.errors.ImageNotFound:
            raise ZoeException(message='Image not found')
        except docker.errors.APIError as e:
            if cont is not None:
                cont.remove(force=True)
            if e.explanation == b'no resources available to schedule container':
                raise ZoeNotEnoughResourcesException(message=str(e))
            else:
                raise ZoeException(message=str(e))
        except Exception as e:
            if cont is not None:
                cont.remove(force=True)
            raise ZoeException(str(e))

        cont = self.cli.containers.get(cont.id)
        return self._container_summary(cont)

    def _container_summary(self, container: docker.models.containers.Container):
        """Translate a docker-specific container object into a simple dictionary."""
        info = {
            "id": container.id,
            "ip_address": {},
            "name": container.name,
150 151
            'labels': container.attrs['Config']['Labels'],
            'external_address': self.docker_config.external_address
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
        }  # type: Dict[str, Any]
        try:
            info['host'] = container.attrs['Node']['Name'],
        except KeyError:
            info['host'] = 'N/A'

        if container.status == 'running' or container.status == 'restarting':
            info["state"] = Service.BACKEND_START_STATUS
            info["running"] = True
        elif container.status == 'paused' or container.status == 'exited':
            info["state"] = Service.BACKEND_DIE_STATUS
            info["running"] = False
        elif container.status == 'OOMKilled':
            info["state"] = Service.BACKEND_OOM_STATUS
            info["running"] = False
        elif container.status == 'created':
            info["state"] = Service.BACKEND_CREATE_STATUS
            info["running"] = False
        else:
            log.error('Unknown container status: {}'.format(container.status))
            info["state"] = Service.BACKEND_UNDEFINED_STATUS
            info["running"] = False

        info['ports'] = {}
176
        if 'Ports' in container.attrs['NetworkSettings'] and container.attrs['NetworkSettings']['Ports'] is not None:
177 178
            for port in container.attrs['NetworkSettings']['Ports']:
                if container.attrs['NetworkSettings']['Ports'][port] is not None:
179
                    info['ports'][port] = container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
180 181 182
                else:
                    info['ports'][port] = None

183 184
        info['cpu_period'] = container.attrs['HostConfig']['CpuPeriod']
        info['cpu_quota'] = container.attrs['HostConfig']['CpuQuota']
185 186
        info['memory_hard_limit'] = container.attrs['HostConfig']['Memory']
        info['memory_soft_limit'] = container.attrs['HostConfig']['MemoryReservation']
187

188 189 190 191 192
        return info

    def inspect_container(self, docker_id: str) -> Dict[str, Any]:
        """Retrieve information about a running container."""
        try:
193
            cont = self.cli.containers.get(docker_id)
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
        except Exception as e:
            raise ZoeException(str(e))
        return self._container_summary(cont)

    def terminate_container(self, docker_id: str, delete=False) -> None:
        """
        Terminate a container.

        :param docker_id: The container to terminate
        :type docker_id: str
        :param delete: If True, also delete the container files
        :type delete: bool
        :return: None
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except docker.errors.NotFound:
            return

        cont.stop(timeout=5)
        if delete:
            try:
                cont.remove(force=True)
            except docker.errors.APIError as e:
                log.warning(str(e))

    def event_listener(self, callback: Callable[[str], bool]) -> None:
        """An infinite loop that listens for events from Swarm."""
        event_gen = self.cli.events(decode=True)
        while True:
            try:
                event = next(event_gen)
            except requests.exceptions.RequestException:
                log.warning('Docker closed event connection, retrying...')
                event_gen = self.cli.events(decode=True)
                continue

            try:
                res = callback(event)
            except Exception:
                log.exception('Uncaught exception in swarm event callback')
                log.warning('event was: {}'.format(event))
                continue
            if not res:
                break

    def list(self, only_label=None) -> Iterable[dict]:
        """
        List running or defined containers.

        :param only_label: filter containers with only a certain label
        :return: a list of containers
        """
        try:
            ret = self.cli.containers.list(all=True)
        except docker.errors.APIError as ex:
            raise ZoeException(str(ex))
        except requests.exceptions.RequestException as ex:
            raise ZoeException(str(ex))
        if only_label is None:
            only_label = {}
        conts = []
        for cont_info in ret:
            match = True
            for key, value in only_label.items():
                if key not in cont_info.attrs['Config']['Labels']:
                    match = False
                    break
                if cont_info.attrs['Config']['Labels'][key] != value:
                    match = False
                    break
            if match:
                conts.append(self._container_summary(cont_info))

        return conts

    def stats(self, docker_id: str, stream: bool):
        """Retrieves container stats based on resource usage."""
        try:
            cont = self.cli.containers.get(docker_id)
274 275 276 277
        except docker.errors.NotFound:
            raise ZoeException('Container not found')
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
278 279 280

        try:
            return cont.stats(stream=stream)
281 282 283 284 285 286
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
        except requests.exceptions.ReadTimeout:
            raise ZoeException('Read timeout')
        except ValueError:
            raise ZoeException('Docker API decoding error')
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

    def logs(self, docker_id: str, stream: bool, follow=None):
        """
        Retrieves the logs of the selected container.

        :param docker_id:
        :param stream:
        :param follow:
        :return:
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return None

        try:
            return cont.logs(stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
        except docker.errors.APIError:
            return None
306 307 308 309 310 311 312 313 314 315 316 317 318 319

    def list_images(self):
        """Retrieve the list of images available on this node."""
        try:
            return self.cli.images.list()
        except (docker.errors.NotFound, docker.errors.APIError):
            return []

    def pull_image(self, image_name):
        """Pulls an image in the docker engine."""
        try:
            self.cli.images.pull(image_name)
        except docker.errors.APIError as e:
            log.error('Cannot download image {}: {}'.format(image_name, e))
320
            raise ZoeException('Cannot download image {}: {}'.format(image_name, e))