[GITLAB] - UPGRADE TO v12 on Wednesday the 18th of December at 11.30AM

execution.py 7.53 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to PostgresQL for Zoe state."""

import datetime
import logging
20
import threading
21
import functools
22 23 24 25 26 27 28 29 30 31 32 33 34 35

log = logging.getLogger(__name__)


class Execution:
    """
    A Zoe execution.

    :type time_submit: datetime.datetime
    :type time_start: datetime.datetime
    :type time_end: datetime.datetime
    """

    SUBMIT_STATUS = "submitted"
36
    IMAGE_DL_STATUS = "image download"
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    SCHEDULED_STATUS = "scheduled"
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
    RUNNING_STATUS = "running"
    CLEANING_UP_STATUS = "cleaning up"
    TERMINATED_STATUS = "terminated"

    def __init__(self, d, sql_manager):
        self.sql_manager = sql_manager
        self.id = d['id']

        self.user_id = d['user_id']
        self.name = d['name']
        self.description = d['description']

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_submit = d['time_submit']
        else:
55
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_submit'])
56 57 58 59

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_start = d['time_start']
        else:
60
            self.time_start = datetime.datetime.utcfromtimestamp(d['time_start'])
61 62 63 64

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_end = d['time_end']
        else:
65
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_start'])
66 67 68 69

        self._status = d['status']
        self.error_message = d['error_message']

70 71 72 73
        try:
            self.size = self.description['size']
        except KeyError:
            self.size = self.description['priority']  # zapp format v2
74

75 76
        self.termination_lock = threading.Lock()

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    def serialize(self):
        """Generates a dictionary that can be serialized in JSON."""
        return {
            'id': self.id,
            'user_id': self.user_id,
            'name': self.name,
            'description': self.description,
            'time_submit': self.time_submit.timestamp(),
            'time_start': None if self.time_start is None else self.time_start.timestamp(),
            'time_end': None if self.time_end is None else self.time_end.timestamp(),
            'status': self._status,
            'error_message': self.error_message,
            'services': [s.id for s in self.services]
        }

    def __eq__(self, other):
        return self.id == other.id

    def set_scheduled(self):
        """The execution has been added to the scheduler queues."""
        self._status = self.SCHEDULED_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

100 101 102 103 104
    def set_image_dl(self):
        """The execution has been added to the scheduler queues."""
        self._status = self.IMAGE_DL_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

105 106 107 108 109 110 111 112
    def set_starting(self):
        """The services of the execution are being created in Swarm."""
        self._status = self.STARTING_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

    def set_running(self):
        """The execution is running and producing useful work."""
        self._status = self.RUNNING_STATUS
113
        self.time_start = datetime.datetime.utcnow()
114 115 116 117 118 119 120 121 122 123
        self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)

    def set_cleaning_up(self):
        """The services of the execution are being terminated."""
        self._status = self.CLEANING_UP_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

    def set_terminated(self):
        """The execution is not running."""
        self._status = self.TERMINATED_STATUS
124
        self.time_end = datetime.datetime.utcnow()
125 126 127 128 129
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)

    def set_error(self):
        """The scheduler encountered an error starting or running the execution."""
        self._status = self.ERROR_STATUS
130
        self.time_end = datetime.datetime.utcnow()
131 132 133 134 135 136 137
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)

    def set_error_message(self, message):
        """Contains an error message in case the status is 'error'."""
        self.error_message = message
        self.sql_manager.execution_update(self.id, error_message=self.error_message)

138
    @property
139 140
    def is_active(self):
        """
141
        Returns False if the execution ended completely
142 143
        :return:
        """
144
        return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS or self._status == self.IMAGE_DL_STATUS
145

146
    @property
147 148 149 150
    def is_running(self):
        """Returns True is the execution has at least the essential services running."""
        return self._status == self.RUNNING_STATUS

151 152 153 154 155 156 157 158 159
    @property
    def status(self):
        """Getter for the execution status."""
        return self._status

    @property
    def services(self):
        """Getter for this execution service list."""
        return self.sql_manager.service_list(execution_id=self.id)
160

161 162 163 164 165 166 167 168 169 170
    @property
    def essential_services(self):
        """Getter for this execution essential service list."""
        return self.sql_manager.service_list(execution_id=self.id, essential=True)

    @property
    def elastic_services(self):
        """Getter for this execution elastic service list."""
        return self.sql_manager.service_list(execution_id=self.id, essential=False)

171
    @property
172
    def essential_services_running(self) -> bool:
173 174
        """Returns True if all essential services of this execution have started."""
        for service in self.services:
175 176 177 178 179 180
            if service.essential and service.is_dead():
                return False
        return True

    @property
    def all_services_running(self) -> bool:
181
        """Return True if all services of this execution are running."""
182 183 184 185 186
        for service in self.services:
            if service.is_dead():
                return False
        return True

187 188 189 190 191 192 193 194
    @property
    def all_services_active(self) -> bool:
        """Return True if all services of this execution are active."""
        for service in self.services:
            if service.status != service.ACTIVE_STATUS:
                return False
        return True

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
    @property
    def running_services_count(self) -> int:
        """Returns the number of services of this execution that are running."""
        count = 0
        for service in self.services:
            if not service.is_dead():
                count += 1
        return count

    @property
    def services_count(self) -> int:
        """Return the total number of services defined for this execution."""
        return len(self.services)

    @property
    def total_reservations(self):
        """Return the union/sum of resources reserved by all services of this execution."""
212
        return functools.reduce(lambda x, y: x + y, [s.resource_reservation for s in self.services])
213 214 215

    def __repr__(self):
        return str(self.id)