api_client.py 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Docker API."""

import logging
19
from typing import List, Callable, Dict, Any
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers

import requests.exceptions

from zoe_lib.config import get_conf
from zoe_lib.state import Service, VolumeDescriptionHostPath
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.docker.config import DockerHostConfig  # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException

log = logging.getLogger(__name__)

try:
    docker.DockerClient()
except AttributeError:
    log.error('Docker package does not have the DockerClient attribute')
    raise ImportError('Wrong Docker library version')


class DockerClient:
    """The client class that wraps the Docker API."""
46
    def __init__(self, docker_config: DockerHostConfig, mock_client=None) -> None:
47
        self.name = docker_config.name
48
        self.docker_config = docker_config
49
50
51
52
53
        if not docker_config.tls:
            tls = None
        else:
            tls = docker.tls.TLSConfig(client_cert=(docker_config.tls_cert, docker_config.tls_key), verify=docker_config.tls_ca)

54
55
56
57
58
        # Simplify testing
        if mock_client is not None:
            self.cli = mock_client
            return

59
60
61
62
63
64
65
66
67
68
69
        try:
            self.cli = docker.DockerClient(base_url=docker_config.address, version="auto", tls=tls)
        except docker.errors.DockerException as e:
            raise ZoeException("Cannot connect to Docker host {} at address {}: {}".format(docker_config.name, docker_config.address, str(e)))

    def info(self) -> Dict:
        """Retrieve engine statistics."""
        return self.cli.info()

    def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
        """Create and start a new container."""
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
        run_args = {
            'detach': True,
            'ports': {},
            'environment': {},
            'volumes': {},
            'working_dir': service_instance.work_dir,
            'mem_limit': 0,
            'mem_reservation': 0,
            'memswap_limit': 0,
            'name': service_instance.name,
            'network_disabled': False,
            'network_mode': get_conf().overlay_network_name,
            'image': service_instance.image_name,
            'command': service_instance.command,
            'hostname': service_instance.hostname,
            'labels': service_instance.labels,
            'cpu_period': 100000,
            'cpu_quota': 100000,
            'log_config': {
                "type": "json-file",
                "config": {}
            }
        }
93
        for port in service_instance.ports:
94
            run_args['ports'][str(port.number) + '/' + port.proto] = None
95
96

        for name, value in service_instance.environment:
97
            run_args['environment'][name] = value
98
99
100
101

        for volume in service_instance.volumes:
            if volume.type == "host_directory":
                assert isinstance(volume, VolumeDescriptionHostPath)
102
                run_args['volumes'][volume.path] = {'bind': volume.mount_point, 'mode': ("ro" if volume.readonly else "rw")}
103
104
105
106
            else:
                log.error('Swarm backend does not support volume type {}'.format(volume.type))

        if service_instance.memory_limit is not None:
107
108
109
110
            run_args['mem_limit'] = service_instance.memory_limit.max
            run_args['mem_reservation'] = service_instance.memory_limit.min
            if service_instance.memory_limit.max == service_instance.memory_limit.min:
                run_args['mem_reservation'] -= 1
111
112

        if service_instance.core_limit is not None:
113
            run_args['cpu_quota'] = int(100000 * service_instance.core_limit.min)
114
115

        if get_conf().gelf_address != '':
116
            run_args['log_config'] = {
117
118
119
120
121
122
123
                "type": "gelf",
                "config": {
                    'gelf-address': get_conf().gelf_address,
                    'labels': ",".join(service_instance.labels)
                }
            }

124
        cont = None
125
        try:
126
            cont = self.cli.containers.run(**run_args)
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
        except docker.errors.ImageNotFound:
            raise ZoeException(message='Image not found')
        except docker.errors.APIError as e:
            if cont is not None:
                cont.remove(force=True)
            if e.explanation == b'no resources available to schedule container':
                raise ZoeNotEnoughResourcesException(message=str(e))
            else:
                raise ZoeException(message=str(e))
        except Exception as e:
            if cont is not None:
                cont.remove(force=True)
            raise ZoeException(str(e))

        cont = self.cli.containers.get(cont.id)
        return self._container_summary(cont)

    def _container_summary(self, container: docker.models.containers.Container):
        """Translate a docker-specific container object into a simple dictionary."""
        info = {
            "id": container.id,
            "ip_address": {},
            "name": container.name,
150
151
            'labels': container.attrs['Config']['Labels'],
            'external_address': self.docker_config.external_address
152
153
        }  # type: Dict[str, Any]
        try:
154
            info['host'] = container.attrs['Node']['Name']
155
156
157
158
159
160
        except KeyError:
            info['host'] = 'N/A'

        if container.status == 'running' or container.status == 'restarting':
            info["state"] = Service.BACKEND_START_STATUS
            info["running"] = True
161
        elif container.status == 'paused' or container.status == 'exited' or container.status == 'dead':
162
163
164
165
166
167
168
169
170
171
172
173
174
175
            info["state"] = Service.BACKEND_DIE_STATUS
            info["running"] = False
        elif container.status == 'OOMKilled':
            info["state"] = Service.BACKEND_OOM_STATUS
            info["running"] = False
        elif container.status == 'created':
            info["state"] = Service.BACKEND_CREATE_STATUS
            info["running"] = False
        else:
            log.error('Unknown container status: {}'.format(container.status))
            info["state"] = Service.BACKEND_UNDEFINED_STATUS
            info["running"] = False

        info['ports'] = {}
176
        if 'Ports' in container.attrs['NetworkSettings'] and container.attrs['NetworkSettings']['Ports'] is not None:
177
178
            for port in container.attrs['NetworkSettings']['Ports']:
                if container.attrs['NetworkSettings']['Ports'][port] is not None:
179
                    info['ports'][port] = container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
180
181
182
                else:
                    info['ports'][port] = None

183
184
        info['cpu_period'] = container.attrs['HostConfig']['CpuPeriod']
        info['cpu_quota'] = container.attrs['HostConfig']['CpuQuota']
185
186
        info['memory_hard_limit'] = container.attrs['HostConfig']['Memory']
        info['memory_soft_limit'] = container.attrs['HostConfig']['MemoryReservation']
187

188
189
190
191
192
        return info

    def inspect_container(self, docker_id: str) -> Dict[str, Any]:
        """Retrieve information about a running container."""
        try:
193
            cont = self.cli.containers.get(docker_id)
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
        except Exception as e:
            raise ZoeException(str(e))
        return self._container_summary(cont)

    def terminate_container(self, docker_id: str, delete=False) -> None:
        """
        Terminate a container.

        :param docker_id: The container to terminate
        :type docker_id: str
        :param delete: If True, also delete the container files
        :type delete: bool
        :return: None
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except docker.errors.NotFound:
            return

        cont.stop(timeout=5)
        if delete:
            try:
                cont.remove(force=True)
            except docker.errors.APIError as e:
                log.warning(str(e))

    def event_listener(self, callback: Callable[[str], bool]) -> None:
        """An infinite loop that listens for events from Swarm."""
        event_gen = self.cli.events(decode=True)
        while True:
            try:
                event = next(event_gen)
            except requests.exceptions.RequestException:
                log.warning('Docker closed event connection, retrying...')
                event_gen = self.cli.events(decode=True)
                continue

            try:
                res = callback(event)
            except Exception:
                log.exception('Uncaught exception in swarm event callback')
                log.warning('event was: {}'.format(event))
                continue
            if not res:
                break

240
    def list(self, only_label=None) -> List[dict]:
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
        """
        List running or defined containers.

        :param only_label: filter containers with only a certain label
        :return: a list of containers
        """
        try:
            ret = self.cli.containers.list(all=True)
        except docker.errors.APIError as ex:
            raise ZoeException(str(ex))
        except requests.exceptions.RequestException as ex:
            raise ZoeException(str(ex))
        if only_label is None:
            only_label = {}
        conts = []
        for cont_info in ret:
            match = True
            for key, value in only_label.items():
                if key not in cont_info.attrs['Config']['Labels']:
                    match = False
                    break
                if cont_info.attrs['Config']['Labels'][key] != value:
                    match = False
                    break
            if match:
                conts.append(self._container_summary(cont_info))

        return conts

    def stats(self, docker_id: str, stream: bool):
        """Retrieves container stats based on resource usage."""
        try:
            cont = self.cli.containers.get(docker_id)
274
275
276
277
        except docker.errors.NotFound:
            raise ZoeException('Container not found')
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
278
279
280

        try:
            return cont.stats(stream=stream)
281
282
283
284
285
286
        except docker.errors.APIError as e:
            raise ZoeException('Docker API error: {}'.format(e))
        except requests.exceptions.ReadTimeout:
            raise ZoeException('Read timeout')
        except ValueError:
            raise ZoeException('Docker API decoding error')
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305

    def logs(self, docker_id: str, stream: bool, follow=None):
        """
        Retrieves the logs of the selected container.

        :param docker_id:
        :param stream:
        :param follow:
        :return:
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return None

        try:
            return cont.logs(stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
        except docker.errors.APIError:
            return None
306
307
308
309
310
311
312
313
314
315
316
317
318
319

    def list_images(self):
        """Retrieve the list of images available on this node."""
        try:
            return self.cli.images.list()
        except (docker.errors.NotFound, docker.errors.APIError):
            return []

    def pull_image(self, image_name):
        """Pulls an image in the docker engine."""
        try:
            self.cli.images.pull(image_name)
        except docker.errors.APIError as e:
            log.error('Cannot download image {}: {}'.format(image_name, e))
320
            raise ZoeException('Cannot download image {}: {}'.format(image_name, e))
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340

    def update(self, docker_id, cpu_quota=None, mem_reservation=None, mem_limit=None):
        """Update the resource reservation for a container."""
        kwargs = {}
        if cpu_quota is not None:
            kwargs['cpu_quota'] = cpu_quota
        if mem_reservation is not None:
            kwargs['mem_reservation'] = mem_reservation
        if mem_limit is not None:
            kwargs['mem_limit'] = mem_limit

        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return

        try:
            cont.update(**kwargs)
        except docker.errors.APIError:
            pass