api_client.py 14.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Docker API."""

import time
import logging
from typing import Iterable, Callable, Dict, Any

import humanfriendly

try:
    from consul import Consul
except ImportError:
    Consul = None

try:
    from kazoo.client import KazooClient
except ImportError:
    KazooClient = None

34
35
36
37
38
import docker
import docker.tls
import docker.errors
import docker.utils
import docker.models.containers
39

40
import requests.exceptions
41
42

from zoe_lib.config import get_conf
43
from zoe_lib.state import Service, VolumeDescriptionHostPath
44
45
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
46
from zoe_master.exceptions import ZoeException, ZoeNotEnoughResourcesException
47
48
49

log = logging.getLogger(__name__)

50
51
52
53
54
try:
    docker.DockerClient()
except AttributeError:
    log.error('Docker package does not have the DockerClient attribute')
    raise ImportError('Wrong Docker library version')
55

56

57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
    """
    Given a Zookeeper server list, find the currently active Swarm master.
    :param zk_server_list: Zookeeper server list
    :param path: Swarm path in Zookeeper
    :return: Swarm master connection string
    """
    path += '/docker/swarm/leader'
    zk_client = KazooClient(hosts=zk_server_list)
    zk_client.start()
    master, stat_ = zk_client.get(path)
    zk_client.stop()
    return master.decode('utf-8')


def consul_swarm(consul_ip: str) -> str:
    """
    Using consul as discovery service, find the currently active Swarm master.
    :param consul_ip: consul ip address
    :return: Swarm master connection string
    """
    leader_key = 'docker/swarm/leader'
    consul_client = Consul(consul_ip)
    key_val = consul_client.kv.get(leader_key)
    master = key_val[1]['Value']
    return master.decode('utf-8')


class SwarmClient:
    """The Swarm client class that wraps the Docker API."""
    def __init__(self) -> None:
        url = get_conf().backend_swarm_url
89
        tls = False
90
91
        if 'zk://' in url:
            if KazooClient is None:
92
                raise ZoeException('ZooKeeper URL for Swarm, but the kazoo package is not installed')
93
94
95
96
            url = url[len('zk://'):]
            manager = zookeeper_swarm(url, get_conf().backend_swarm_zk_path)
        elif 'consul://' in url:
            if Consul is None:
97
                raise ZoeException('Consul URL for Swarm, but the consul package is not installed')
98
99
            url = url[len('consul://'):]
            manager = consul_swarm(url)
100
101
102
103
        elif 'http://' in url:
            manager = url
        elif 'https://' in url:
            tls = docker.tls.TLSConfig(client_cert=(get_conf().backend_swarm_tls_cert, get_conf().backend_swarm_tls_key), verify=get_conf().backend_swarm_tls_ca)
104
105
            manager = url
        else:
106
            raise ZoeException('Unsupported URL scheme for Swarm')
107
108
109
110
        try:
            self.cli = docker.DockerClient(base_url=manager, version="auto", tls=tls)
        except docker.errors.DockerException:
            raise ZoeException("Cannot connect to Docker")
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

    def info(self) -> ClusterStats:
        """Retrieve Swarm statistics. The Docker API returns a mess difficult to parse."""
        info = self.cli.info()
        pl_status = ClusterStats()

        # SystemStatus is a list...
        idx = 0  # Role, skip
        idx += 1
        assert 'Strategy' in info["SystemStatus"][idx][0]
        pl_status.placement_strategy = info["SystemStatus"][idx][1]
        idx += 1
        assert 'Filters' in info["SystemStatus"][idx][0]
        pl_status.active_filters = [x.strip() for x in info["SystemStatus"][idx][1].split(", ")]
        idx += 1
        assert 'Nodes' in info["SystemStatus"][idx][0]
        node_count = int(info["SystemStatus"][idx][1])
        idx += 1  # At index 4 the nodes begin
        for node in range(node_count):
            idx2 = 0
            node_stats = NodeStats(info["SystemStatus"][idx + node][0].strip())
            node_stats.docker_endpoint = info["SystemStatus"][idx + node][1]
            idx2 += 1  # ID, skip
            idx2 += 1  # Status
135
136
137
138
            if info["SystemStatus"][idx + node + idx2][1] == 'Healthy':
                node_stats.status = 'online'
            else:
                node_stats.status = 'offline'
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
            idx2 += 1  # Containers
            node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
            idx2 += 1  # CPUs
            node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
            node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
            idx2 += 1  # Memory
            node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
            node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
            idx2 += 1  # Labels
            node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
            idx2 += 1  # Last update
            node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
            idx2 += 1  # Docker version
            node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]

            node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
            node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)

            pl_status.nodes.append(node_stats)
            idx += idx2
        pl_status.timestamp = time.time()
        return pl_status

    def spawn_container(self, service_instance: ServiceInstance) -> Dict[str, Any]:
        """Create and start a new container."""
        cont = None
        port_bindings = {}  # type: Dict[str, Any]
        for port in service_instance.ports:
167
            port_bindings[str(port.number) + '/' + port.proto] = None
168
169
170
171
172
173
174

        environment = {}
        for name, value in service_instance.environment:
            environment[name] = value

        volumes = {}
        for volume in service_instance.volumes:
175
176
177
178
            if volume.type == "host_directory":
                assert isinstance(volume, VolumeDescriptionHostPath)
                volumes[volume.path] = {'bind': volume.mount_point, 'mode': ("ro" if volume.readonly else "rw")}
            else:
179
                log.error('Swarm backend does not support volume type {}'.format(volume.type))
180

Daniele Venzano's avatar
Daniele Venzano committed
181
182
183
184
185
186
        if service_instance.memory_limit is not None:
            mem_limit = service_instance.memory_limit.max
        else:
            mem_limit = 0
        # Swarm backend does not support cores in a consistent way, see https://github.com/docker/swarm/issues/475

187
188
189
190
191
192
193
        if get_conf().gelf_address != '':
            log_config = {
                "type": "gelf",
                "config": {
                    'gelf-address': get_conf().gelf_address,
                    'labels': ",".join(service_instance.labels)
                }
194
            }
195
196
197
198
199
200
        else:
            log_config = {
                "type": "json-file",
                "config": {}
            }

201
202
203
204
205
206
207
        try:
            cont = self.cli.containers.run(image=service_instance.image_name,
                                           command=service_instance.command,
                                           detach=True,
                                           environment=environment,
                                           hostname=service_instance.hostname,
                                           labels=service_instance.labels,
208
                                           log_config=log_config,
209
                                           mem_limit=mem_limit,
210
                                           memswap_limit=0,
211
212
213
214
                                           name=service_instance.name,
                                           network_disabled=False,
                                           network_mode=get_conf().overlay_network_name,
                                           ports=port_bindings,
215
                                           working_dir=service_instance.work_dir,
216
217
                                           volumes=volumes)
        except docker.errors.ImageNotFound:
218
            raise ZoeException(message='Image not found')
219
220
        except docker.errors.APIError as e:
            if cont is not None:
221
                cont.remove(force=True)
222
            if e.explanation == b'no resources available to schedule container':
223
                raise ZoeNotEnoughResourcesException(message=str(e))
224
            else:
225
                raise ZoeException(message=str(e))
226
227
        except Exception as e:
            if cont is not None:
228
                cont.remove(force=True)
229
            raise ZoeException(str(e))
230
231
232
233
234
235
236
237
238
239
240
241

        cont = self.cli.containers.get(cont.id)
        return self._container_summary(cont)

    def _container_summary(self, container: docker.models.containers.Container):
        """Translate a docker-specific container object into a simple dictionary."""
        info = {
            "id": container.id,
            "ip_address": {},
            "name": container.name,
            'labels': container.attrs['Config']['Labels']
        }  # type: Dict[str, Any]
242
        try:
243
            info['host'] = container.attrs['Node']['Name']
244
245
        except KeyError:
            info['host'] = 'N/A'
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282

        if container.status == 'running' or container.status == 'restarting':
            info["state"] = Service.BACKEND_START_STATUS
            info["running"] = True
        elif container.status == 'paused' or container.status == 'exited':
            info["state"] = Service.BACKEND_DIE_STATUS
            info["running"] = False
        elif container.status == 'OOMKilled':
            info["state"] = Service.BACKEND_OOM_STATUS
            info["running"] = False
        elif container.status == 'created':
            info["state"] = Service.BACKEND_CREATE_STATUS
            info["running"] = False
        else:
            log.error('Unknown container status: {}'.format(container.status))
            info["state"] = Service.BACKEND_UNDEFINED_STATUS
            info["running"] = False

        info['ports'] = {}
        if container.attrs['NetworkSettings']['Ports'] is not None:
            for port in container.attrs['NetworkSettings']['Ports']:
                if container.attrs['NetworkSettings']['Ports'][port] is not None:
                    mapping = (
                        container.attrs['NetworkSettings']['Ports'][port][0]['HostIp'],
                        container.attrs['NetworkSettings']['Ports'][port][0]['HostPort']
                    )
                    info['ports'][port] = mapping
                else:
                    info['ports'][port] = None

        return info

    def inspect_container(self, docker_id: str) -> Dict[str, Any]:
        """Retrieve information about a running container."""
        try:
            cont = self.cli.container.get(docker_id)
        except Exception as e:
283
            raise ZoeException(str(e))
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
        return self._container_summary(cont)

    def terminate_container(self, docker_id: str, delete=False) -> None:
        """
        Terminate a container.

        :param docker_id: The container to terminate
        :type docker_id: str
        :param delete: If True, also delete the container files
        :type delete: bool
        :return: None
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except docker.errors.NotFound:
            return

        cont.stop(timeout=5)
        if delete:
303
304
305
306
            try:
                cont.remove(force=True)
            except docker.errors.APIError as e:
                log.warning(str(e))
307
308
309
310
311
312
313

    def event_listener(self, callback: Callable[[str], bool]) -> None:
        """An infinite loop that listens for events from Swarm."""
        event_gen = self.cli.events(decode=True)
        while True:
            try:
                event = next(event_gen)
314
            except requests.exceptions.RequestException:
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
                log.warning('Docker closed event connection, retrying...')
                event_gen = self.cli.events(decode=True)
                continue

            try:
                res = callback(event)
            except Exception:
                log.exception('Uncaught exception in swarm event callback')
                log.warning('event was: {}'.format(event))
                continue
            if not res:
                break

    def list(self, only_label=None) -> Iterable[dict]:
        """
        List running or defined containers.

        :param only_label: filter containers with only a certain label
        :return: a list of containers
        """
335
336
337
338
        try:
            ret = self.cli.containers.list(all=True)
        except docker.errors.APIError as ex:
            raise ZoeException(str(ex))
339
340
        except requests.exceptions.RequestException as ex:
            raise ZoeException(str(ex))
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
        conts = []
        for cont_info in ret:
            match = True
            for key, value in only_label.items():
                if key not in cont_info.attrs['Config']['Labels']:
                    match = False
                    break
                if cont_info.attrs['Config']['Labels'][key] != value:
                    match = False
                    break
            if match:
                conts.append(self._container_summary(cont_info))

        return conts

    def logs(self, docker_id: str, stream: bool, follow=None):
        """
        Retrieves the logs of the selected container.

        :param docker_id:
        :param stream:
        :param follow:
        :return:
        """
        try:
            cont = self.cli.containers.get(docker_id)
        except (docker.errors.NotFound, docker.errors.APIError):
            return None

        try:
            return cont.logs(docker_id, stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
        except docker.errors.APIError:
            return None