service.py 9.54 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to PostgresQL for Zoe state."""

import logging

from zoe_lib.config import get_conf

log = logging.getLogger(__name__)


25 26 27 28 29 30 31 32 33
class ResourceLimits:
    """A resource limits description."""
    def __init__(self, data, unit):
        if isinstance(data, dict):
            self.min = data['min']
            self.max = data['max']
        elif isinstance(data, ResourceLimits):
            self.min = data.min
            self.max = data.max
34 35 36
        elif isinstance(data, int):
            self.min = data
            self.max = data
37 38 39 40
        else:
            raise TypeError
        self.unit = unit

41 42 43 44 45
        if self.min is None:
            self.min = 0
        if self.max is None:
            self.max = 0

46 47 48 49 50 51 52
    def __add__(self, other):
        if isinstance(other, ResourceLimits) and self.unit == other.unit:
            res = {
                'min': self.min + other.min,
                'max': self.max + other.max
            }
            return ResourceLimits(res, self.unit)
53
        else:
Daniele Venzano's avatar
Daniele Venzano committed
54
            raise NotImplementedError
55 56


57 58 59
class ResourceReservation:
    """The resources reserved by a Service."""
    def __init__(self, data):
60 61
        self.memory = ResourceLimits(data['memory'], "bytes")
        self.cores = ResourceLimits(data['cores'], 'units')
62

63 64 65 66 67 68 69 70 71 72
    def __add__(self, other):
        if isinstance(other, ResourceReservation):
            res = {
                'memory': self.memory + other.memory,
                'cores': self.cores + other.cores
            }
            return ResourceReservation(res)
        else:
            return NotImplemented

73 74 75

class VolumeDescription:
    """A generic description for container volumes."""
76 77 78 79 80 81 82
    def __init__(self, vtype: str):
        self.type = vtype


class VolumeDescriptionHostPath(VolumeDescription):
    """Host-based volumes."""
    def __init__(self, name: str, path: str, readonly: bool):
83
        super().__init__("host_directory")
84 85 86
        self.path = path
        self.mount_point = '/mnt/' + name
        self.readonly = readonly
87 88


89 90 91 92 93 94 95 96
class Service:
    """A Zoe Service."""

    TERMINATING_STATUS = "terminating"
    INACTIVE_STATUS = "inactive"
    ACTIVE_STATUS = "active"
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
97
    RUNNABLE_STATUS = "runnable"
98

99 100 101 102 103 104
    BACKEND_UNDEFINED_STATUS = 'undefined'
    BACKEND_CREATE_STATUS = 'created'
    BACKEND_START_STATUS = 'started'
    BACKEND_DIE_STATUS = 'dead'
    BACKEND_DESTROY_STATUS = 'destroyed'
    BACKEND_OOM_STATUS = 'oom-killed'
105 106 107 108 109 110 111 112 113 114 115

    def __init__(self, d, sql_manager):
        self.sql_manager = sql_manager
        self.id = d['id']

        self.name = d['name']
        self.status = d['status']
        self.error_message = d['error_message']
        self.execution_id = d['execution_id']
        self.description = d['description']
        self.service_group = d['service_group']
116 117
        self.backend_id = d['backend_id']
        self.backend_status = d['backend_status']
118
        self.backend_host = d['backend_host']
119

120 121 122 123
        self.ip_address = d['ip_address']
        if self.ip_address is not None and ('/32' in self.ip_address or '/128' in self.ip_address):
            self.ip_address = self.ip_address.split('/')[0]

124 125
        self.essential = d['essential']

126
        # Fields parsed from the JSON description
127 128 129 130 131
        try:
            self.image_name = self.description['image']
        except KeyError:
            self.image_name = self.description['docker_image']  # zapp description v2

132 133
        self.is_monitor = self.description['monitor']
        self.startup_order = self.description['startup_order']
134 135 136 137 138 139 140 141 142 143 144

        try:
            self.environment = self.description['environment']
        except KeyError:
            self.environment = []

        try:
            self.command = self.description['command']
        except KeyError:
            self.command = None

145 146 147 148 149
        try:
            self.work_dir = self.description['work_dir']
        except KeyError:
            self.work_dir = None

150 151 152 153 154 155
        try:
            self.resource_reservation = ResourceReservation(self.description['resources'])
        except KeyError:
            self.resource_reservation = ResourceReservation({'memory': self.description['required_resources']['memory'], 'cores': 0})  # ZApp description v2

        try:
156
            self.volumes = [VolumeDescriptionHostPath(v['name'], v['path'], v['read_only']) for v in self.description['volumes']]
157 158 159 160 161 162
        except KeyError:
            self.volumes = []
        except TypeError:
            self.volumes = [VolumeDescriptionHostPath(v[0], v[1], v[2]) for v in self.description['volumes']]

        try:
163
            self.labels = self.description['labels']
164
        except KeyError:
165
            self.labels = []
166

167 168 169 170 171 172 173 174 175 176
    def serialize(self):
        """Generates a dictionary that can be serialized in JSON."""
        return {
            'id': self.id,
            'name': self.name,
            'status': self.status,
            'error_message': self.error_message,
            'execution_id': self.execution_id,
            'description': self.description,
            'service_group': self.service_group,
177
            'backend_id': self.backend_id,
178
            'ip_address': self.ip_address,
179
            'backend_status': self.backend_status,
180
            'backend_host': self.backend_host,
181 182
            'essential': self.essential,
            'proxy_address': self.proxy_address
183 184 185 186 187 188 189 190 191 192 193 194
        }

    def __eq__(self, other):
        return self.id == other.id

    def set_terminating(self):
        """The service is being killed."""
        self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
        self.status = self.TERMINATING_STATUS

    def set_inactive(self):
        """The service is not running."""
195
        self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, backend_id=None, ip_address=None, backend_host=None)
196
        self.status = self.INACTIVE_STATUS
197 198
        for port in self.ports:
            port.reset()
199
        self.backend_host = None
200 201 202 203 204 205

    def set_starting(self):
        """The service is being created by Docker."""
        self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
        self.status = self.STARTING_STATUS

206 207 208 209 210
    def set_runnable(self):
        """The service is elastic and can be started."""
        self.sql_manager.service_update(self.id, status=self.RUNNABLE_STATUS)
        self.status = self.RUNNABLE_STATUS

211 212 213
    def set_active(self, backend_id, ip_address):
        """The service is running and has a valid backend_id."""
        self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, backend_id=backend_id, error_message=None, ip_address=ip_address)
214
        self.error_message = None
215 216
        self.ip_address = ip_address
        self.backend_id = backend_id
217 218 219 220 221 222 223 224
        self.status = self.ACTIVE_STATUS

    def set_error(self, error_message):
        """The service could not be created/started."""
        self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
        self.status = self.ERROR_STATUS
        self.error_message = error_message

225
    def set_backend_status(self, new_status):
226
        """Docker has emitted an event related to this service."""
227 228
        self.sql_manager.service_update(self.id, backend_status=new_status)
        log.debug("service {}, backend status updated to {}".format(self.id, new_status))
229
        self.backend_status = new_status
230

231 232 233 234 235 236
    def assign_backend_host(self, backend_host):
        """Assign this service to a host in particular."""
        self.sql_manager.service_update(self.id, backend_host=backend_host)
        log.debug('service {} assigned to host {}'.format(self.id, backend_host))
        self.backend_host = backend_host

237
    @property
238 239 240
    def dns_name(self):
        """Getter for the DNS name of this service as it will be registered in Docker's DNS."""
        return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)
241 242 243

    @property
    def user_id(self):
244
        """Getter for the user_id, that is actually taken from the parent execution."""
245 246
        execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
        return execution.user_id
247

248 249 250 251 252
    @property
    def ports(self):
        """Getter for the ports exposed by this service."""
        return self.sql_manager.port_list(service_id=self.id)

253 254
    @property
    def proxy_address(self):
hxquangnhat's avatar
hxquangnhat committed
255
        """Get proxy address path"""
256
        if len(self.ports) > 0:
qhoangxuan's avatar
qhoangxuan committed
257
            return self.name + "-" + str(self.execution_id) + "-" + get_conf().deployment_name + "." + get_conf().proxy_path
258 259
        else:
            return None
260

261 262
    def is_dead(self):
        """Returns True if this service is not running."""
263
        return self.backend_status == self.BACKEND_DESTROY_STATUS or self.backend_status == self.BACKEND_OOM_STATUS or self.backend_status == self.BACKEND_DIE_STATUS or self.backend_status == self.BACKEND_UNDEFINED_STATUS
264 265 266 267 268

    @property
    def unique_name(self):
        """Returns a name for this service that is unique across multiple Zoe instances running on the same backend."""
        return self.name + '-' + str(self.execution_id) + '-' + get_conf().deployment_name