api_client.py 12.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Docker API."""

import logging
19
from typing import List, Callable, Dict, Any
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45

import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers

import requests.exceptions

from zoe_lib.config import get_conf
from zoe_lib.state import Service, VolumeDescriptionHostPath
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.config import DockerHostConfig  # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException

log = logging.getLogger(__name__)

try:
    docker.DockerClient()
except AttributeError:
    log.error('Docker package does not have the DockerClient attribute')
    raise ImportError('Wrong Docker library version')


class DockerClient:
    """The client class that wraps the Docker API."""
46
    def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None:
47
        self.name = docker_config.name
48
        self.docker_config = docker_config
49 50 51 52 53
        if not docker_config.tls:
            tls = None
        else:
            tls = docker.tls.TLSConfig(client_cert=(docker_config.tls_cert, docker_config.tls_key), verify=docker_config.tls_ca)

54 55 56 57 58
        # Simplify testing
        if mock_client is not None:
            self.cli = mock_client
            return

59 60 61 62 63 64 65 66 67 68 69
        try:
            self.cli = docker.DockerClient(base_url=docker_config.address, version="auto", tls=tls)
        except docker.errors.DockerException as e:
            raise ZoeException("Cannot connect to Docker host {} at address {}: {}".format(docker_config.name, docker_config.address, str(e)))

    def info(self) -> Dict:
        """Retrieve engine statistics."""
        return self.cli.info()

    def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
        """Create and start a new container."""
70 71 72 73 74 75 76 77 78 79 80
        run_args = {
            'detach': True,
            'ports': {},
            'environment': {},
            'volumes': {},
            'working_dir': service_instance.work_dir,
            'mem_limit': 0,
            'mem_reservation': 0,
            'memswap_limit': 0,
            'name': service_instance.name,
            'network_disabled': False,
Daniele Venzano's avatar
Daniele Venzano committed
81
            'network_mode': service_instance.network,
82 83 84 85 86 87 88 89 90 91 92
            'image': service_instance.image_name,
            'command': service_instance.command,
            'hostname': service_instance.hostname,
            'labels': service_instance.labels,
            'cpu_period': 100000,
            'cpu_quota': 100000,
            'log_config': {
                "type": "json-file",
                "config": {}
            }
        }
93
        for port in service_instance.ports:
94
            run_args['ports'][str(port.number) + '/' + port.proto] = None
95 96

        for name, value in service_instance.environment:
97
            run_args['environment'][name] = value
98 99 100 101

        for volume in service_instance.volumes:
            if volume.type == "host_directory":
                assert isinstance(volume, VolumeDescriptionHostPath)
102
                run_args['volumes'][volume.path] = {'bi nd': volume.mount_point, 'mode': ("ro" if volume.readonly else "rw")}
103
            else:
104
                log.error('Docker backend does not support volume type {}'.format(volume.type))
105 106

        if service_instance.memory_limit is not None:
107 108 109 110
            run_args['mem_limit'] = service_instance.memory_limit.min
#            run_args['mem_reservation'] = service_instance.memory_limit.min
#            if service_instance.memory_limit.max == service_instance.memory_limit.min:
#                run_args['mem_reservation'] -= 1
111 112

        if service_instance.core_limit is not None:
113
            run_args['cpu_quota'] = int(100000 * service_instance.core_limit.min)
114

115 116 117
        if service_instance.shm_size > 0:
            run_args['shm_size'] = service_instance.shm_size

118
        if get_conf().gelf_address != '':
119
            run_args['log_config'] = {
120 121 122 123 124 125 126
                "type": "gelf",
                "config": {
                    'gelf-address': get_conf().gelf_address,
                    'labels': ",".join(service_instance.labels)
                }
            }

127
        cont = None
128
        try:
129
            cont = self.cli.containers.run(**run_args)
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        except docker.errors.ImageNotFound:
            raise ZoeException(message='Image not found')
        except docker.errors.APIError as e:
            if cont is not None:
                cont.remove(force=True)
            if e.explanation == b'no resources available to schedule container':
                raise ZoeNotEnoughResourcesException(message=str(e))
            else:
                raise ZoeException(message=str(e))
        except Exception as e:
            if cont is not None:
                cont.remove(force=True)
            raise ZoeException(str(e))

        cont = self.cli.containers.get(cont.id)
        return self._container_summary(cont)

    def _container_summary(self, container: docker.models.containers.Container):
        """Translate a docker-specific container object into a simple dictionary."""
        info = {
            "id": container.id,
            "ip_address": {},
            "name": container.name,
153 154
            'labels': container.attrs['Config']['Labels'],
            'external_address': self.docker_config.external_address
155 156
        }  # type: Dict[str, Any]
        try:
157
            info['host'] = container.attrs['Node']['Name']
158 159 160
        except KeyError:
            info['host'] = 'N/A'

161
        if container.status == 'running' or container.status == 'restarting' or container.status == 'removing':
162 163
            info["state"] = Service.BACKEND_START_STATUS
            info["running"] = True
164
        elif container.status == 'paused' or container.status == 'exited' or container.status == 'dead':
165 166 167 168 169 170 171 172 173 174 175 176 177 178
            info["state"] = Service.BACKEND_DIE_STATUS
            info["running"] = False
        elif container.status == 'OOMKilled':
            info["state"] = Service.BACKEND_OOM_STATUS
            info["running"] = False
        elif container.status == 'created':
            info["state"] = Service.BACKEND_CREATE_STATUS
            info["running"] = False
        else:
            log.error('Unknown container status: {}'.format(container.status))
            info["state"] = Service.BACKEND_UNDEFINED_STATUS
            info["running"] = False

        info['ports'] = {}
179
        if 'Ports' in container.attrs['NetworkSettings'] and container.attrs['NetworkSettings']['Ports'] is not None:
180 181
            for port in container.attrs['NetworkSettings']['Ports']:
                if container.attrs['NetworkSettings']['Ports'][port] is not None:
182
                    info['ports'][port] = container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
183 184 185
                else:
                    info['ports'][port] = None

186 187
        info['cpu_period'] = container.attrs['HostConfig']['CpuPeriod']
        info['cpu_quota'] = container.attrs['HostConfig']['CpuQuota']
188 189
        info['memory_hard_limit'] = container.attrs['HostConfig']['Memory']
        info['memory_soft_limit'] = container.attrs['HostConfig']['MemoryReservation']
190

191 192 193 194 195
        return info

    def inspect_container(self, docker_id: str) -> Dict[str, Any]:
        """Retrieve information about a running container."""
        try:
196
            cont = self.cli.containers.get(docker_id)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
        except Exception as e:
            raise ZoeException(str(e))
        return self._container_summary(cont)

    def terminate_container(self, docker_id: str, delete=False) -> None:
        """
        Terminate a container.

        :param docker_id: The container to terminate
        :type docker_id: str
        :param delete: If True, also delete the container files
        :type delete: bool
        :return: None
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except docker.errors.NotFound:
            return

216 217
        try:
            if delete:
218
                cont.remove(force=True)
219 220 221 222 223 224
            else:
                cont.stop(timeout=5)
        except docker.errors.NotFound:
            pass
        except docker.errors.APIError as e:
            log.warning(str(e))
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245

    def event_listener(self, callback: Callable[[str], bool]) -> None:
        """An infinite loop that listens for events from Swarm."""
        event_gen = self.cli.events(decode=True)
        while True:
            try:
                event = next(event_gen)
            except requests.exceptions.RequestException:
                log.warning('Docker closed event connection, retrying...')
                event_gen = self.cli.events(decode=True)
                continue

            try:
                res = callback(event)
            except Exception:
                log.exception('Uncaught exception in swarm event callback')
                log.warning('event was: {}'.format(event))
                continue
            if not res:
                break

246
    def list(self, only_label=None) -> List[dict]:
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
        """
        List running or defined containers.

        :param only_label: filter containers with only a certain label
        :return: a list of containers
        """
        try:
            ret = self.cli.containers.list(all=True)
        except docker.errors.APIError as ex:
            raise ZoeException(str(ex))
        except requests.exceptions.RequestException as ex:
            raise ZoeException(str(ex))
        if only_label is None:
            only_label = {}
        conts = []
        for cont_info in ret:
            match = True
            for key, value in only_label.items():
                if key not in cont_info.attrs['Config']['Labels']:
                    match = False
                    break
                if cont_info.attrs['Config']['Labels'][key] != value:
                    match = False
                    break
            if match:
                conts.append(self._container_summary(cont_info))

        return conts

    def stats(self, docker_id: str, stream: bool):
        """Retrieves container stats based on resource usage."""
        try:
            cont = self.cli.containers.get(docker_id)
280 281 282 283
        except docker.errors.NotFound:
            raise ZoeException('Container not found')
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
284 285 286

        try:
            return cont.stats(stream=stream)
287 288 289 290 291 292
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
        except requests.exceptions.ReadTimeout:
            raise ZoeException('Read timeout')
        except ValueError:
            raise ZoeException('Docker API decoding error')
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311

    def logs(self, docker_id: str, stream: bool, follow=None):
        """
        Retrieves the logs of the selected container.

        :param docker_id:
        :param stream:
        :param follow:
        :return:
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return None

        try:
            return cont.logs(stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
        except docker.errors.APIError:
            return None
312 313 314 315 316 317 318 319 320 321 322 323 324 325

    def list_images(self):
        """Retrieve the list of images available on this node."""
        try:
            return self.cli.images.list()
        except (docker.errors.NotFound, docker.errors.APIError):
            return []

    def pull_image(self, image_name):
        """Pulls an image in the docker engine."""
        try:
            self.cli.images.pull(image_name)
        except docker.errors.APIError as e:
            log.error('Cannot download image {}: {}'.format(image_name, e))
326
            raise ZoeException('Cannot download image {}: {}'.format(image_name, e))
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346

    def update(self, docker_id, cpu_quota=None, mem_reservation=None, mem_limit=None):
        """Update the resource reservation for a container."""
        kwargs = {}
        if cpu_quota is not None:
            kwargs['cpu_quota'] = cpu_quota
        if mem_reservation is not None:
            kwargs['mem_reservation'] = mem_reservation
        if mem_limit is not None:
            kwargs['mem_limit'] = mem_limit

        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return

        try:
            cont.update(**kwargs)
        except docker.errors.APIError:
            pass