execution.py 15.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to PostgresQL for Zoe state."""

import datetime
import logging
20
import functools
21

22 23
import psycopg2

24 25 26 27 28
try:
    from kazoo.client import KazooClient
except ImportError:
    KazooClient = None

29
from zoe_lib.state.base import BaseRecord, BaseTable
30
import zoe_lib.config
31

32 33 34
log = logging.getLogger(__name__)


35
class Execution(BaseRecord):
36 37 38 39 40 41 42 43 44
    """
    A Zoe execution.

    :type time_submit: datetime.datetime
    :type time_start: datetime.datetime
    :type time_end: datetime.datetime
    """

    SUBMIT_STATUS = "submitted"
45
    QUEUED_STATUS = "queued"
46 47 48 49 50 51 52
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
    RUNNING_STATUS = "running"
    CLEANING_UP_STATUS = "cleaning up"
    TERMINATED_STATUS = "terminated"

    def __init__(self, d, sql_manager):
53
        super().__init__(d, sql_manager)
54 55 56 57 58 59 60 61

        self.user_id = d['user_id']
        self.name = d['name']
        self.description = d['description']

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_submit = d['time_submit']
        else:
62
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_submit'])
63 64 65 66

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_start = d['time_start']
        else:
67
            self.time_start = datetime.datetime.utcfromtimestamp(d['time_start'])
68 69 70 71

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_end = d['time_end']
        else:
72
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_start'])
73 74 75 76

        self._status = d['status']
        self.error_message = d['error_message']

77 78 79 80 81 82 83
        if d['size'] is not None:
            self.size = float(d['size'])
        else:
            try:
                self.size = self.description['size']
            except KeyError:
                self.size = self.description['priority']  # zapp format v2
84

85 86 87 88 89 90 91
    def serialize(self):
        """Generates a dictionary that can be serialized in JSON."""
        return {
            'id': self.id,
            'user_id': self.user_id,
            'name': self.name,
            'description': self.description,
92 93 94
            'time_submit': (self.time_submit - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
            'time_start': None if self.time_start is None else (self.time_start - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
            'time_end': None if self.time_end is None else (self.time_end - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
95 96
            'status': self._status,
            'error_message': self.error_message,
97 98
            'services': [s.id for s in self.services],
            'size': self.size
99 100 101 102 103
        }

    def __eq__(self, other):
        return self.id == other.id

104
    def set_queued(self):
105
        """The execution has been added to the scheduler queues."""
106
        self._status = self.QUEUED_STATUS
107
        self.sql_manager.executions.update(self.id, status=self._status)
108 109 110 111

    def set_starting(self):
        """The services of the execution are being created in Swarm."""
        self._status = self.STARTING_STATUS
112
        self.sql_manager.executions.update(self.id, status=self._status)
113 114 115 116

    def set_running(self):
        """The execution is running and producing useful work."""
        self._status = self.RUNNING_STATUS
117
        self.time_start = datetime.datetime.utcnow()
118
        self.sql_manager.executions.update(self.id, status=self._status, time_start=self.time_start)
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
        #  This hackish code is here to support dynamic reverse proxying of web interfaces running in Zoe executions (e.g. jupyter)
        #  The idea is to use Træfik to do the reverse proxying, configured to use zookeeper to store dynamic configuration
        #  Zoe updates ZooKeeper whenever an execution runs or is terminated and Træfik craetes or deletes the route automatically
        if zoe_lib.config.get_conf().traefik_zk_ips is not None:
            zk = KazooClient(hosts=zoe_lib.config.get_conf().traefik_zk_ips)
            zk.start()
            for service in self.services:
                for port in service.ports:
                    endpoint = port.url_template.format(**{"ip_port": port.external_ip + ":" + str(port.external_port)})
                    traefik_name = 'zoe_exec_{}_{}'.format(self.id, port.id)
                    zk.create('/traefik/backends/{}/servers/server/url'.format(traefik_name), endpoint, makepath=True)
                    zk.create('/traefik/frontends/{}/routes/path/rule'.format(traefik_name), 'PathPrefix:{}/{}'.format(zoe_lib.config.get_conf().traefik_base_url, port.proxy_key()), makepath=True)
                    zk.create('/traefik/frontends/{}/backend'.format(traefik_name), traefik_name, makepath=True)
            zk.create('/traefik/alias')
            zk.delete('/traefik/alias')
            zk.stop()
135 136 137 138

    def set_cleaning_up(self):
        """The services of the execution are being terminated."""
        self._status = self.CLEANING_UP_STATUS
139
        self.sql_manager.executions.update(self.id, status=self._status)
140 141 142 143 144 145 146 147 148
        #  See comment in method above
        if zoe_lib.config.get_conf().traefik_zk_ips is not None:
            zk = KazooClient(hosts=zoe_lib.config.get_conf().traefik_zk_ips)
            zk.start()
            zk.delete('/traefik/backends/zoe_exec_{}', recursive=True)
            zk.delete('/traefik/frontends/zoetest', recursive=True)
            zk.create('/traefik/alias')
            zk.delete('/traefik/alias')
            zk.stop()
149

150
    def set_terminated(self, reason=None):
151 152
        """The execution is not running."""
        self._status = self.TERMINATED_STATUS
153
        self.time_end = datetime.datetime.utcnow()
154 155 156 157
        if reason is not None:
            self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end, error_message=reason)
        else:
            self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
158 159 160 161

    def set_error(self):
        """The scheduler encountered an error starting or running the execution."""
        self._status = self.ERROR_STATUS
162
        self.time_end = datetime.datetime.utcnow()
163
        self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
164 165 166 167

    def set_error_message(self, message):
        """Contains an error message in case the status is 'error'."""
        self.error_message = message
168
        self.sql_manager.executions.update(self.id, error_message=self.error_message)
169

170 171 172 173 174
    def set_size(self, new_size):
        """Changes the size of the execution, for policies that calculate the size automatically."""
        self.size = new_size
        self.sql_manager.executions.update(self.id, size=new_size)

175
    @property
176 177
    def is_active(self):
        """
178
        Returns False if the execution ended completely
179 180
        :return:
        """
181
        return self._status == self.SUBMIT_STATUS or self._status == self.QUEUED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
182

183
    @property
184 185 186 187
    def is_running(self):
        """Returns True is the execution has at least the essential services running."""
        return self._status == self.RUNNING_STATUS

188 189 190 191 192 193 194 195
    @property
    def status(self):
        """Getter for the execution status."""
        return self._status

    @property
    def services(self):
        """Getter for this execution service list."""
196
        return self.sql_manager.services.select(execution_id=self.id)
197

198 199 200
    @property
    def essential_services(self):
        """Getter for this execution essential service list."""
201
        return self.sql_manager.services.select(execution_id=self.id, essential=True)
202 203 204 205

    @property
    def elastic_services(self):
        """Getter for this execution elastic service list."""
206
        return self.sql_manager.services.select(execution_id=self.id, essential=False)
207

208
    @property
209
    def essential_services_running(self) -> bool:
210 211
        """Returns True if all essential services of this execution have started."""
        for service in self.services:
212 213 214 215 216 217
            if service.essential and service.is_dead():
                return False
        return True

    @property
    def all_services_running(self) -> bool:
218
        """Return True if all services of this execution are running."""
219 220 221 222 223
        for service in self.services:
            if service.is_dead():
                return False
        return True

224 225 226 227 228 229 230 231
    @property
    def all_services_active(self) -> bool:
        """Return True if all services of this execution are active."""
        for service in self.services:
            if service.status != service.ACTIVE_STATUS:
                return False
        return True

232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
    @property
    def running_services_count(self) -> int:
        """Returns the number of services of this execution that are running."""
        count = 0
        for service in self.services:
            if not service.is_dead():
                count += 1
        return count

    @property
    def services_count(self) -> int:
        """Return the total number of services defined for this execution."""
        return len(self.services)

    @property
    def total_reservations(self):
        """Return the union/sum of resources reserved by all services of this execution."""
249
        if len(self.services) == 0:
250
            return None
251
        return functools.reduce(lambda x, y: x + y, [s.resource_reservation for s in self.services])
252

253 254 255 256 257
    @property
    def owner(self):
        """Returns the full user object that owns this execution."""
        return self.sql_manager.user.select(only_one=True, **{'id': self.user_id})

258 259
    def __repr__(self):
        return str(self.id)
260 261 262 263


class ExecutionTable(BaseTable):
    """Abstraction for the execution table in the database."""
264 265
    def __init__(self, sql_manager):
        super().__init__(sql_manager, "execution")
266 267 268 269 270 271

    def create(self):
        """Create the execution table."""
        self.cursor.execute('''CREATE TABLE execution (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL,
272
            user_id INT REFERENCES "user",
273 274
            description JSON NOT NULL,
            status TEXT NOT NULL,
275
            size NUMERIC NOT NULL,
276 277 278 279 280 281 282 283 284 285
            time_submit TIMESTAMP NOT NULL,
            time_start TIMESTAMP NULL,
            time_end TIMESTAMP NULL,
            error_message TEXT NULL
            )''')

    def insert(self, name, user_id, description):
        """Create a new execution in the state."""
        status = Execution.SUBMIT_STATUS
        time_submit = datetime.datetime.utcnow()
286
        query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, size, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, description['size'], time_submit))
287
        self.cursor.execute(query)
288
        self.sql_manager.commit()
289 290
        return self.cursor.fetchone()[0]

291
    def select(self, only_one=False, limit=-1, base=0, **kwargs):
292 293 294 295 296 297 298
        """
        Return a list of executions.

        :param only_one: only one result is expected
        :type only_one: bool
        :param limit: limit the result to this number of entries
        :type limit: int
299 300
        :type base: int
        :param base: the base value to use when limiting result count
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
        q_base = 'SELECT * FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
            for key, value in kwargs.items():
                if key == 'earlier_than_submit':
                    filter_list.append('"time_submit" <= to_timestamp(%s)')
                elif key == 'earlier_than_start':
                    filter_list.append('"time_start" <= to_timestamp(%s)')
                elif key == 'earlier_than_end':
                    filter_list.append('"time_end" <= to_timestamp(%s)')
                elif key == 'later_than_submit':
                    filter_list.append('"time_submit" >= to_timestamp(%s)')
                elif key == 'later_than_start':
                    filter_list.append('"time_start" >= to_timestamp(%s)')
                elif key == 'later_than_end':
                    filter_list.append('"time_end" >= to_timestamp(%s)')
                else:
                    filter_list.append('{} = %s'.format(key))
                args_list.append(value)
            q += ' AND '.join(filter_list)
            if limit > 0:
327
                q += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
328 329 330
            query = self.cursor.mogrify(q, args_list)
        else:
            if limit > 0:
331
                q_base += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
332 333
            query = self.cursor.mogrify(q_base)

334 335 336 337 338 339 340 341 342
        try:
            self.cursor.execute(query)
        except psycopg2.Error as e:
            log.error('db error: {}'.format(e))
            if only_one:
                return None
            else:
                return []

343 344 345 346
        if only_one:
            row = self.cursor.fetchone()
            if row is None:
                return None
347
            return Execution(row, self.sql_manager)
348
        else:
349
            return [Execution(x, self.sql_manager) for x in self.cursor]
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

    def count(self, **kwargs):
        """
        Return a list of executions.

        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
        q_base = 'SELECT COUNT(*) FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
            for key, value in kwargs.items():
                if key == 'earlier_than_submit':
                    filter_list.append('"time_submit" <= to_timestamp(%s)')
                elif key == 'earlier_than_start':
                    filter_list.append('"time_start" <= to_timestamp(%s)')
                elif key == 'earlier_than_end':
                    filter_list.append('"time_end" <= to_timestamp(%s)')
                elif key == 'later_than_submit':
                    filter_list.append('"time_submit" >= to_timestamp(%s)')
                elif key == 'later_than_start':
                    filter_list.append('"time_start" >= to_timestamp(%s)')
                elif key == 'later_than_end':
                    filter_list.append('"time_end" >= to_timestamp(%s)')
                else:
                    filter_list.append('{} = %s'.format(key))
                args_list.append(value)
            q += ' AND '.join(filter_list)
            query = self.cursor.mogrify(q, args_list)
        else:
            query = self.cursor.mogrify(q_base)

384 385 386 387 388 389
        try:
            self.cursor.execute(query)
        except psycopg2.Error as e:
            log.error('db error: {}'.format(e))
            return 0

390 391
        row = self.cursor.fetchone()
        return row[0]