execution.py 16 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to PostgresQL for Zoe state."""

import datetime
import logging
20
import functools
21

22 23
import psycopg2

24 25 26 27 28
try:
    from kazoo.client import KazooClient
except ImportError:
    KazooClient = None

29
from zoe_lib.state.base import BaseRecord, BaseTable
30
import zoe_lib.config
31

32 33 34
log = logging.getLogger(__name__)


35
class Execution(BaseRecord):
36 37 38 39 40 41 42 43 44
    """
    A Zoe execution.

    :type time_submit: datetime.datetime
    :type time_start: datetime.datetime
    :type time_end: datetime.datetime
    """

    SUBMIT_STATUS = "submitted"
45
    QUEUED_STATUS = "queued"
46 47 48 49 50 51 52
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
    RUNNING_STATUS = "running"
    CLEANING_UP_STATUS = "cleaning up"
    TERMINATED_STATUS = "terminated"

    def __init__(self, d, sql_manager):
53
        super().__init__(d, sql_manager)
54 55 56 57 58 59 60 61

        self.user_id = d['user_id']
        self.name = d['name']
        self.description = d['description']

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_submit = d['time_submit']
        else:
62
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_submit'])
63 64 65 66

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_start = d['time_start']
        else:
67
            self.time_start = datetime.datetime.utcfromtimestamp(d['time_start'])
68 69 70 71

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_end = d['time_end']
        else:
72
            self.time_submit = datetime.datetime.utcfromtimestamp(d['time_start'])
73 74 75 76

        self._status = d['status']
        self.error_message = d['error_message']

77 78 79 80 81 82 83
        if d['size'] is not None:
            self.size = float(d['size'])
        else:
            try:
                self.size = self.description['size']
            except KeyError:
                self.size = self.description['priority']  # zapp format v2
84

85 86
        self.app_name = self.description['name']

87 88 89 90 91 92 93
    def serialize(self):
        """Generates a dictionary that can be serialized in JSON."""
        return {
            'id': self.id,
            'user_id': self.user_id,
            'name': self.name,
            'description': self.description,
94 95 96
            'time_submit': (self.time_submit - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
            'time_start': None if self.time_start is None else (self.time_start - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
            'time_end': None if self.time_end is None else (self.time_end - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
97 98
            'status': self._status,
            'error_message': self.error_message,
99 100
            'services': [s.id for s in self.services],
            'size': self.size
101 102 103 104 105
        }

    def __eq__(self, other):
        return self.id == other.id

106
    def set_queued(self):
107
        """The execution has been added to the scheduler queues."""
108
        self._status = self.QUEUED_STATUS
109
        self.sql_manager.executions.update(self.id, status=self._status)
110 111 112 113

    def set_starting(self):
        """The services of the execution are being created in Swarm."""
        self._status = self.STARTING_STATUS
114
        self.sql_manager.executions.update(self.id, status=self._status)
115 116 117 118

    def set_running(self):
        """The execution is running and producing useful work."""
        self._status = self.RUNNING_STATUS
119
        self.time_start = datetime.datetime.utcnow()
120
        self.sql_manager.executions.update(self.id, status=self._status, time_start=self.time_start)
121 122 123 124
        #  This hackish code is here to support dynamic reverse proxying of web interfaces running in Zoe executions (e.g. jupyter)
        #  The idea is to use Træfik to do the reverse proxying, configured to use zookeeper to store dynamic configuration
        #  Zoe updates ZooKeeper whenever an execution runs or is terminated and Træfik craetes or deletes the route automatically
        if zoe_lib.config.get_conf().traefik_zk_ips is not None:
Daniele Venzano's avatar
Daniele Venzano committed
125 126
            zk_cli = KazooClient(hosts=zoe_lib.config.get_conf().traefik_zk_ips)
            zk_cli.start()
127 128
            for service in self.services:
                for port in service.ports:
129
                    if port.enable_proxy:
130 131 132 133 134
                        format_args = {
                            "ip_port": port.external_ip + ":" + str(port.external_port),
                            "proxy_path": '{}/{}'.format(zoe_lib.config.get_conf().traefik_base_url, port.proxy_key())
                        }
                        endpoint = port.url_template.format(**format_args).encode('utf-8')
135
                        traefik_name = 'zoe_exec_{}_{}'.format(self.id, port.id)
Daniele Venzano's avatar
Daniele Venzano committed
136 137 138 139 140 141
                        zk_cli.create('/traefik/backends/{}/servers/server/url'.format(traefik_name), endpoint, makepath=True)
                        zk_cli.create('/traefik/frontends/{}/routes/path/rule'.format(traefik_name), 'PathPrefix:{}/{}'.format(zoe_lib.config.get_conf().traefik_base_url, port.proxy_key()).encode('utf-8'), makepath=True)
                        zk_cli.create('/traefik/frontends/{}/backend'.format(traefik_name), traefik_name.encode('utf-8'), makepath=True)
            zk_cli.create('/traefik/alias')
            zk_cli.delete('/traefik/alias')
            zk_cli.stop()
142 143 144 145

    def set_cleaning_up(self):
        """The services of the execution are being terminated."""
        self._status = self.CLEANING_UP_STATUS
146
        self.sql_manager.executions.update(self.id, status=self._status)
147 148
        #  See comment in method above
        if zoe_lib.config.get_conf().traefik_zk_ips is not None:
Daniele Venzano's avatar
Daniele Venzano committed
149 150
            zk_cli = KazooClient(hosts=zoe_lib.config.get_conf().traefik_zk_ips)
            zk_cli.start()
151 152 153 154
            for service in self.services:
                for port in service.ports:
                    if port.enable_proxy:
                        traefik_name = 'zoe_exec_{}_{}'.format(self.id, port.id)
Daniele Venzano's avatar
Daniele Venzano committed
155 156 157 158 159
                        zk_cli.delete('/traefik/backends/{}'.format(traefik_name), recursive=True)
                        zk_cli.delete('/traefik/frontends/{}'.format(traefik_name), recursive=True)
            zk_cli.create('/traefik/alias')
            zk_cli.delete('/traefik/alias')
            zk_cli.stop()
160

161
    def set_terminated(self, reason=None):
162 163
        """The execution is not running."""
        self._status = self.TERMINATED_STATUS
164
        self.time_end = datetime.datetime.utcnow()
165 166 167 168
        if reason is not None:
            self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end, error_message=reason)
        else:
            self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
169 170 171 172

    def set_error(self):
        """The scheduler encountered an error starting or running the execution."""
        self._status = self.ERROR_STATUS
173
        self.time_end = datetime.datetime.utcnow()
174
        self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
175 176 177 178

    def set_error_message(self, message):
        """Contains an error message in case the status is 'error'."""
        self.error_message = message
179
        self.sql_manager.executions.update(self.id, error_message=self.error_message)
180

181 182 183 184 185
    def set_size(self, new_size):
        """Changes the size of the execution, for policies that calculate the size automatically."""
        self.size = new_size
        self.sql_manager.executions.update(self.id, size=new_size)

186
    @property
187 188
    def is_active(self):
        """
189
        Returns False if the execution ended completely
190 191
        :return:
        """
192
        return self._status == self.SUBMIT_STATUS or self._status == self.QUEUED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
193

194
    @property
195 196 197 198
    def is_running(self):
        """Returns True is the execution has at least the essential services running."""
        return self._status == self.RUNNING_STATUS

199 200 201 202 203 204 205 206
    @property
    def status(self):
        """Getter for the execution status."""
        return self._status

    @property
    def services(self):
        """Getter for this execution service list."""
207
        return self.sql_manager.services.select(execution_id=self.id)
208

209 210 211
    @property
    def essential_services(self):
        """Getter for this execution essential service list."""
212
        return self.sql_manager.services.select(execution_id=self.id, essential=True)
213 214 215 216

    @property
    def elastic_services(self):
        """Getter for this execution elastic service list."""
217
        return self.sql_manager.services.select(execution_id=self.id, essential=False)
218

219
    @property
220
    def essential_services_running(self) -> bool:
221 222
        """Returns True if all essential services of this execution have started."""
        for service in self.services:
223 224 225 226 227 228
            if service.essential and service.is_dead():
                return False
        return True

    @property
    def all_services_running(self) -> bool:
229
        """Return True if all services of this execution are running."""
230 231 232 233 234
        for service in self.services:
            if service.is_dead():
                return False
        return True

235 236 237 238 239 240 241 242
    @property
    def all_services_active(self) -> bool:
        """Return True if all services of this execution are active."""
        for service in self.services:
            if service.status != service.ACTIVE_STATUS:
                return False
        return True

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
    @property
    def running_services_count(self) -> int:
        """Returns the number of services of this execution that are running."""
        count = 0
        for service in self.services:
            if not service.is_dead():
                count += 1
        return count

    @property
    def services_count(self) -> int:
        """Return the total number of services defined for this execution."""
        return len(self.services)

    @property
    def total_reservations(self):
        """Return the union/sum of resources reserved by all services of this execution."""
260
        if len(self.services) == 0:
261
            return None
262
        return functools.reduce(lambda x, y: x + y, [s.resource_reservation for s in self.services])
263

264 265 266 267 268
    @property
    def owner(self):
        """Returns the full user object that owns this execution."""
        return self.sql_manager.user.select(only_one=True, **{'id': self.user_id})

269 270
    def __repr__(self):
        return str(self.id)
271 272 273 274


class ExecutionTable(BaseTable):
    """Abstraction for the execution table in the database."""
275 276
    def __init__(self, sql_manager):
        super().__init__(sql_manager, "execution")
277 278 279 280 281 282

    def create(self):
        """Create the execution table."""
        self.cursor.execute('''CREATE TABLE execution (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL,
283
            user_id INT REFERENCES "user",
284 285
            description JSON NOT NULL,
            status TEXT NOT NULL,
286
            size NUMERIC NOT NULL,
287 288 289 290 291 292 293 294 295 296
            time_submit TIMESTAMP NOT NULL,
            time_start TIMESTAMP NULL,
            time_end TIMESTAMP NULL,
            error_message TEXT NULL
            )''')

    def insert(self, name, user_id, description):
        """Create a new execution in the state."""
        status = Execution.SUBMIT_STATUS
        time_submit = datetime.datetime.utcnow()
297
        query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, size, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, description['size'], time_submit))
298
        self.cursor.execute(query)
299
        self.sql_manager.commit()
300 301
        return self.cursor.fetchone()[0]

302
    def select(self, only_one=False, limit=-1, base=0, **kwargs):
303 304 305 306 307 308 309
        """
        Return a list of executions.

        :param only_one: only one result is expected
        :type only_one: bool
        :param limit: limit the result to this number of entries
        :type limit: int
310 311
        :type base: int
        :param base: the base value to use when limiting result count
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
        q_base = 'SELECT * FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
            for key, value in kwargs.items():
                if key == 'earlier_than_submit':
                    filter_list.append('"time_submit" <= to_timestamp(%s)')
                elif key == 'earlier_than_start':
                    filter_list.append('"time_start" <= to_timestamp(%s)')
                elif key == 'earlier_than_end':
                    filter_list.append('"time_end" <= to_timestamp(%s)')
                elif key == 'later_than_submit':
                    filter_list.append('"time_submit" >= to_timestamp(%s)')
                elif key == 'later_than_start':
                    filter_list.append('"time_start" >= to_timestamp(%s)')
                elif key == 'later_than_end':
                    filter_list.append('"time_end" >= to_timestamp(%s)')
                else:
                    filter_list.append('{} = %s'.format(key))
                args_list.append(value)
            q += ' AND '.join(filter_list)
            if limit > 0:
338
                q += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
339 340 341
            query = self.cursor.mogrify(q, args_list)
        else:
            if limit > 0:
342
                q_base += ' ORDER BY id DESC LIMIT {} OFFSET {}'.format(limit, base)
343 344
            query = self.cursor.mogrify(q_base)

345 346 347 348 349 350 351 352 353
        try:
            self.cursor.execute(query)
        except psycopg2.Error as e:
            log.error('db error: {}'.format(e))
            if only_one:
                return None
            else:
                return []

354 355 356 357
        if only_one:
            row = self.cursor.fetchone()
            if row is None:
                return None
358
            return Execution(row, self.sql_manager)
359
        else:
360
            return [Execution(x, self.sql_manager) for x in self.cursor]
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

    def count(self, **kwargs):
        """
        Return a list of executions.

        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
        q_base = 'SELECT COUNT(*) FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
            for key, value in kwargs.items():
                if key == 'earlier_than_submit':
                    filter_list.append('"time_submit" <= to_timestamp(%s)')
                elif key == 'earlier_than_start':
                    filter_list.append('"time_start" <= to_timestamp(%s)')
                elif key == 'earlier_than_end':
                    filter_list.append('"time_end" <= to_timestamp(%s)')
                elif key == 'later_than_submit':
                    filter_list.append('"time_submit" >= to_timestamp(%s)')
                elif key == 'later_than_start':
                    filter_list.append('"time_start" >= to_timestamp(%s)')
                elif key == 'later_than_end':
                    filter_list.append('"time_end" >= to_timestamp(%s)')
                else:
                    filter_list.append('{} = %s'.format(key))
                args_list.append(value)
            q += ' AND '.join(filter_list)
            query = self.cursor.mogrify(q, args_list)
        else:
            query = self.cursor.mogrify(q_base)

395 396 397 398 399 400
        try:
            self.cursor.execute(query)
        except psycopg2.Error as e:
            log.error('db error: {}'.format(e))
            return 0

401 402
        row = self.cursor.fetchone()
        return row[0]