threads.py 7.85 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Monitor for the Swarm event stream."""

import logging
import threading
import time

from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
from zoe_master.backends.docker.api_client import DockerClient
from zoe_master.backends.docker.config import DockerConfig, DockerHostConfig  # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException
27
from zoe_master.stats import NodeStats
28 29 30 31 32 33 34 35 36 37 38 39

log = logging.getLogger(__name__)

CHECK_INTERVAL = 10


class DockerStateSynchronizer(threading.Thread):
    """The Docker Checker."""

    def __init__(self, state: SQLManager) -> None:
        super().__init__()
        self.setName('checker')
40 41
        self.stop = threading.Event()
        self.my_stop = threading.Event()
42 43 44
        self.state = state
        self.setDaemon(True)
        self.host_checkers = []
45
        self.host_stats = {}
46
        for docker_host in DockerConfig(get_conf().backend_docker_config_file).read_config():
47 48 49 50 51 52
            th = threading.Thread(target=self._host_subthread, args=(docker_host,), name='synchro_' + docker_host.name, daemon=True)
            th.start()
            self.host_checkers.append((th, docker_host))

        self.start()

Daniele Venzano's avatar
Daniele Venzano committed
53
    def _host_subthread(self, host_config: DockerHostConfig):  # pylint: disable=too-many-locals
54
        log.info("Synchro thread for host {} started".format(host_config.name))
55

56 57
        self.host_stats[host_config.name] = NodeStats(host_config.name)

58
        while True:
59
            time_start = time.time()
60 61
            try:
                my_engine = DockerClient(host_config)
62 63
                container_list = my_engine.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
                info = my_engine.info()
64
            except ZoeException as e:
65
                self.host_stats[host_config.name].status = 'offline'
66
                log.error(str(e))
67
                log.info('Node {} is offline'.format(host_config.name))
68 69 70 71 72 73 74 75
            else:
                if self.host_stats[host_config.name].status == 'offline':
                    log.info('Node {} is now online'.format(host_config.name))
                    self.host_stats[host_config.name].status = 'online'

                self.host_stats[host_config.name].container_count = info['Containers']
                self.host_stats[host_config.name].cores_total = info['NCPU']
                self.host_stats[host_config.name].memory_total = info['MemTotal']
76
                self.host_stats[host_config.name].labels = host_config.labels
77
                if info['Labels'] is not None:
78
                    self.host_stats[host_config.name].labels.union(set(info['Labels']))
79

80
                self.host_stats[host_config.name].memory_allocated = sum([cont['memory_hard_limit'] for cont in container_list if cont['memory_hard_limit'] != info['MemTotal']])
81 82 83
                self.host_stats[host_config.name].cores_allocated = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])

                stats = {}
84 85
                tmp_memory_reserved = 0
                tmp_cores_reserved = 0
86 87 88 89
                for cont in container_list:
                    service = self.state.services.select(only_one=True, backend_host=host_config.name, backend_id=cont['id'])
                    if service is None:
                        log.warning('Container {} on host {} has no corresponding service'.format(cont['name'], host_config.name))
Daniele Venzano's avatar
Daniele Venzano committed
90 91 92
                        if cont['state'] == Service.BACKEND_DIE_STATUS:
                            log.warning('Terminating dead and orphan container {}'.format(cont['name']))
                            my_engine.terminate_container(cont['id'], delete=True)
93
                        continue
94 95 96 97 98 99
                    if service.status == service.TERMINATING_STATUS:
                        if service.backend_id is not None:
                            my_engine.terminate_container(service.backend_id, delete=True)
                        else:
                            service.set_inactive()

100
                    self._update_service_status(service, cont)
101 102
                    tmp_memory_reserved += service.resource_reservation.memory.min
                    tmp_cores_reserved += service.resource_reservation.cores.min
103 104
                    stats[service.id] = {
                        'core_limit': cont['cpu_quota'] / cont['cpu_period'],
105
                        'mem_limit': cont['memory_hard_limit']
106
                    }
107 108
                self.host_stats[host_config.name].memory_reserved = tmp_memory_reserved
                self.host_stats[host_config.name].cores_reserved = tmp_cores_reserved
109 110
                self.host_stats[host_config.name].service_stats = stats

111
                tmp_images = []
112 113 114 115
                for dk_image in my_engine.list_images():
                    image = {
                        'id': dk_image.attrs['Id'],
                        'size': dk_image.attrs['Size'],
116
                        'names': dk_image.tags
117 118 119 120 121
                    }
                    for name in image['names']:
                        if name[-7:] == ':latest':  # add an image with the name without 'latest' to fake Docker image lookup algorithm
                            image['names'].append(name[:-7])
                            break
122 123
                    tmp_images.append(image)
                self.host_stats[host_config.name].images = tmp_images
Daniele Venzano's avatar
Daniele Venzano committed
124 125
                self.host_stats[host_config.name].timestamp = time_start
                self.host_stats[host_config.name].valid = True
126

127 128
            sleep_time = CHECK_INTERVAL - (time.time() - time_start)
            if sleep_time <= 0:
129
                log.warning('synchro thread for host {} is late by {:.2f} seconds'.format(host_config.name, sleep_time * -1))
130 131 132
                sleep_time = 0
            if self.stop.wait(timeout=sleep_time):
                break
133

134
        log.info("Synchro thread for host {} stopped".format(host_config.name))
135

136
    def _update_service_status(self, service: Service, container):
137 138 139 140 141 142 143 144 145
        """Update the service status."""
        if service.backend_status != container['state']:
            old_status = service.backend_status
            service.set_backend_status(container['state'])
            log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))

    def run(self):
        """The thread loop."""
        log.info("Checker thread started")
146 147 148 149
        while True:
            ret = self.my_stop.wait(timeout=CHECK_INTERVAL)
            if ret:
                break
150 151 152 153 154 155 156 157 158 159 160 161 162
            to_remove = []
            to_add = []
            for th, conf in self.host_checkers:
                if not th.is_alive():
                    log.warning('Thread {} has died, starting a new one.'.format(th.name))
                    to_remove.append((th, conf))
                    th = threading.Thread(target=self._host_subthread, args=(conf,), name='synchro_' + conf.name, daemon=True)
                    th.start()
                    to_add.append((th, conf))
            for dead_th in to_remove:
                self.host_checkers.remove(dead_th)
            for new_th in to_add:
                self.host_checkers.append(new_th)
163
        log.info("Checker thread stopped")
164 165 166

    def quit(self):
        """Stops the thread."""
167
        self.stop.set()
168
        for th, conf_ in self.host_checkers:
169
            th.join()
170 171
        self.my_stop.set()
        self.join()