simulated_platform.py 9.47 KB
Newer Older
1 2
"""Classes to hold the system state and simulated container/service placements"""

3
import logging
4 5
import random
from typing import List
6

7
from zoe_lib.state import Execution, Service
8
from zoe_lib.config import get_conf
9
from zoe_master.stats import ClusterStats, NodeStats
10 11
from zoe_master.backends.interface import list_available_images

12

13 14
log = logging.getLogger(__name__)

15 16 17 18 19

class SimulatedNode:
    """A simulated node where containers can be run"""
    def __init__(self, real_node: NodeStats):
        self.real_reservations = {
20 21
            "memory": real_node.memory_reserved,
            "cores": real_node.cores_reserved
22 23
        }
        self.real_free_resources = {
24 25
            "memory": real_node.memory_total - real_node.memory_reserved,
            "cores": real_node.cores_total - real_node.cores_reserved
26 27 28 29
        }
        self.real_active_containers = real_node.container_count
        self.services = []
        self.name = real_node.name
30
        self.labels = real_node.labels
31
        self.images = list_available_images(self.name)
32
        log.debug('Node {}: m {:.2f}GB | c {} | l {} | ncont {}'.format(self.name, self.node_free_memory() / (1024 ** 3), self.node_free_cores(), list(self.labels), self.container_count))
33 34 35

    def service_fits(self, service: Service) -> bool:
        """Checks whether a service can fit in this node"""
36 37
        if 'disabled' in self.labels:
            return False
38 39 40 41 42
        ret = set(service.labels).issubset(self.labels)
        ret = ret and service.resource_reservation.memory.min < self.node_free_memory()
        ret = ret and service.resource_reservation.cores.min <= self.node_free_cores()
        ret = ret and self._image_is_available(service.image_name)
        return ret
43 44 45

    def service_why_unfit(self, service) -> str:
        """Generate an explanation of why the service does not fit this node."""
46 47
        if 'disabled' in self.labels:
            return 'host disabled by the administrator'
48
        if service.resource_reservation.memory.min >= self.node_free_memory():
49
            return 'needs {} bytes of memory'.format(self.node_free_memory() - service.resource_reservation.memory.min)
50
        elif service.resource_reservation.cores.min > self.node_free_cores():
51
            return 'needs {} more cores'.format(self.node_free_cores() - service.resource_reservation.cores.min)
52
        elif not set(service.labels).issubset(self.labels):
53
            return 'service requires labels {} to be defined on the node'.format(service.labels)
54
        elif not self._image_is_available(service.image_name):
55
            return 'image {} is not available on node {}'.format(service.image_name, self.name)
56 57
        else:
            return 'unknown reason'
58 59

    def _image_is_available(self, image_name) -> bool:
60 61
        if get_conf().backend != 'DockerEngine':
            return True
62 63 64 65
        for image in self.images:
            if image_name in image['names']:
                return True
        return False
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

    def service_add(self, service):
        """Add a service in this node."""
        if self.service_fits(service):
            self.services.append(service)
            return True
        else:
            return False

    def service_remove(self, service):
        """Add a service in this node."""
        try:
            self.services.remove(service)
        except ValueError:
            return False
        else:
            return True

    @property
    def container_count(self):
        """Return the number of containers on this node"""
        return self.real_active_containers + len(self.services)

    def node_free_memory(self):
        """Return the amount of free memory for this node"""
        simulated_reservation = 0
        for service in self.services:  # type: Service
93
            simulated_reservation += service.resource_reservation.memory.min
94 95 96 97 98 99 100 101 102 103 104 105 106 107
        free = self.real_free_resources['memory'] - simulated_reservation
        if free < 0:
            log.warning('More memory reserved than there is free on node {}: {}'.format(self.name, free))
        return free

    def node_free_cores(self):
        """Return the amount of free cores available in this node."""
        simulated_reservation = 0
        for service in self.services:  # type: Service
            simulated_reservation += service.resource_reservation.cores.min
        free = self.real_free_resources['cores'] - simulated_reservation
        if free < 0:
            log.warning('More cores reserved than there are free on node {}: {}'.format(self.name, free))
        return free
108 109

    def __repr__(self):
110
        out = 'SN {} | m {:.2f}GB | c {}'.format(self.name, self.node_free_memory() / (1024 ** 3), self.node_free_cores())
Daniele Venzano's avatar
Daniele Venzano committed
111
        return out
112 113 114 115


class SimulatedPlatform:
    """A simulated cluster, composed by simulated nodes"""
116
    def __init__(self, platform_status: ClusterStats):
117
        self.nodes = {}
118
        for node in platform_status.nodes:
119 120
            if node.status == 'online':
                self.nodes[node.name] = SimulatedNode(node)
121

122 123
    def _select_node_policy(self, node_list: List[SimulatedNode]) -> SimulatedNode:
        if get_conf().placement_policy == "random":
124
            selected = random.choice(node_list)
125
        elif get_conf().placement_policy == "waterfill":
126
            node_list.sort(key=lambda n: (len(n.labels), -n.container_count))  # biggest container_count first, lowest label count first
127
            selected = node_list[0]
128
        elif get_conf().placement_policy == "average":
129
            node_list.sort(key=lambda n: (len(n.labels), n.container_count))  # smallest container_count first, lowest label count first
130
            selected = node_list[0]
Daniele Venzano's avatar
Daniele Venzano committed
131 132
        else:
            log.error('Unknown placement policy: {}'.format(get_conf().placement_policy))
133 134 135
            selected = node_list[0]

        for node in node_list:
136
            log.debug(' -> {}: {} {}'.format(node.name, len(node.labels), node.container_count))
137
        return selected
138

139 140 141 142
    def allocate_essential(self, execution: Execution) -> bool:
        """Try to find an allocation for essential services"""
        for service in execution.essential_services:
            candidate_nodes = []
143
            reasons = ''
Daniele Venzano's avatar
Daniele Venzano committed
144
            for node_id_, node in self.nodes.items():
145 146
                if node.service_fits(service):
                    candidate_nodes.append(node)
147
                else:
148
                    reasons += 'node {}: {} ## '.format(node.name, node.service_why_unfit(service))
149
                    log.debug('node rejected: {}'.format(node.service_why_unfit(service)))
150 151
            if len(candidate_nodes) == 0:  # this service does not fit anywhere
                self.deallocate_essential(execution)
152
                log.info('Cannot fit essential service {} anywhere, reasons: {}'.format(service.id, reasons))
153
                return False
154
            log.debug('Node selection for service {} with {} policy'.format(service.id, get_conf().placement_policy))
155 156
            selected_node = self._select_node_policy(candidate_nodes)
            selected_node.service_add(service)
157 158 159 160 161
        return True

    def deallocate_essential(self, execution: Execution):
        """Remove all essential services from the simulated cluster"""
        for service in execution.essential_services:
Daniele Venzano's avatar
Daniele Venzano committed
162
            for node_id_, node in self.nodes.items():
163 164 165 166 167 168 169
                if node.service_remove(service):
                    break

    def allocate_elastic(self, execution: Execution) -> bool:
        """Try to find an allocation for elastic services"""
        at_least_one_allocated = False
        for service in execution.elastic_services:
Daniele Venzano's avatar
Daniele Venzano committed
170
            if service.status == service.ACTIVE_STATUS and service.backend_status != service.BACKEND_DIE_STATUS:
171 172
                continue
            candidate_nodes = []
173
            reasons = ''
Daniele Venzano's avatar
Daniele Venzano committed
174
            for node_id_, node in self.nodes.items():
175 176
                if node.service_fits(service):
                    candidate_nodes.append(node)
177
                else:
178
                    reasons += 'node {}: {} ## '.format(node.name, node.service_why_unfit(service))
179
            if len(candidate_nodes) == 0:  # this service does not fit anywhere
180
                log.info('Cannot fit elastic service {} anywhere, reasons: {}'.format(service.id, reasons))
181
                continue
182
            log.debug('Node selection for service {} with {} policy'.format(service.id, get_conf().placement_policy))
183 184
            selected_node = self._select_node_policy(candidate_nodes)
            selected_node.service_add(service)
185 186 187 188 189 190 191
            service.set_runnable()
            at_least_one_allocated = True
        return at_least_one_allocated

    def deallocate_elastic(self, execution: Execution):
        """Remove all elastic services from the simulated cluster"""
        for service in execution.elastic_services:
Daniele Venzano's avatar
Daniele Venzano committed
192
            for node_id_, node in self.nodes.items():
193 194 195 196 197 198 199
                if node.service_remove(service):
                    service.set_inactive()
                    break

    def aggregated_free_memory(self):
        """Return the amount of free memory across all nodes"""
        total = 0
Daniele Venzano's avatar
Daniele Venzano committed
200 201
        for node_id_, node in self.nodes.items():
            total += node.node_free_memory()
202 203 204 205 206 207 208 209 210 211 212
        return total

    def get_service_allocation(self):
        """Return a map of service IDs to nodes where they have been allocated."""
        placements = {}
        for node_id, node in self.nodes.items():
            for service in node.services:
                placements[service.id] = node_id
        return placements

    def __repr__(self):
Daniele Venzano's avatar
Daniele Venzano committed
213 214 215 216
        out = ''
        for node_id_, node in self.nodes.items():
            out += str(node) + " # "
        return out