sql_manager.py 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16 17
"""Interface to PostgresQL for Zoe state."""

18
import datetime
19
import logging
20 21 22

import psycopg2
import psycopg2.extras
23

24 25
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
26

27 28
log = logging.getLogger(__name__)

29
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
30 31 32


class SQLManager:
33
    """The SQLManager class, should be used as a singleton."""
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    def __init__(self, conf):
        self.user = conf.dbuser
        self.password = conf.dbpass
        self.host = conf.dbhost
        self.port = conf.dbport
        self.dbname = conf.dbname
        self.schema = conf.deployment_name
        self.conn = None
        self._connect()

    def _connect(self):
        dsn = 'dbname=' + self.dbname + \
              ' user=' + self.user + \
              ' password=' + self.password + \
              ' host=' + self.host + \
              ' port=' + str(self.port)

        self.conn = psycopg2.connect(dsn)

    def _cursor(self):
54 55 56 57 58
        try:
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        except psycopg2.InterfaceError:
            self._connect()
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
59 60 61 62
        cur.execute('SET search_path TO {},public'.format(self.schema))
        return cur

    def execution_list(self, only_one=False, **kwargs):
63 64 65 66 67 68 69 70
        """
        Return a list of executions.

        :param only_one: only one result is expected
        :type only_one: bool
        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
71 72 73 74 75 76
        cur = self._cursor()
        q_base = 'SELECT * FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
77 78 79
            for key, value in kwargs.items():
                filter_list.append('{} = %s'.format(key))
                args_list.append(value)
80
            q += ' AND '.join(filter_list)
81 82 83 84 85 86
            query = cur.mogrify(q, args_list)
        else:
            query = cur.mogrify(q_base)

        cur.execute(query)
        if only_one:
87 88 89 90
            row = cur.fetchone()
            if row is None:
                return None
            return Execution(row, self)
91 92 93 94
        else:
            return [Execution(x, self) for x in cur]

    def execution_update(self, exec_id, **kwargs):
95
        """Update the state of an execution."""
96 97 98
        cur = self._cursor()
        arg_list = []
        value_list = []
99 100 101
        for key, value in kwargs.items():
            arg_list.append('{} = %s'.format(key))
            value_list.append(value)
102 103
        set_q = ", ".join(arg_list)
        value_list.append(exec_id)
104
        q_base = 'UPDATE execution SET ' + set_q + ' WHERE id=%s'
105 106 107 108 109
        query = cur.mogrify(q_base, value_list)
        cur.execute(query)
        self.conn.commit()

    def execution_new(self, name, user_id, description):
110
        """Create a new execution in the state."""
111
        cur = self._cursor()
112
        status = Execution.SUBMIT_STATUS
113 114 115 116
        time_submit = datetime.datetime.now()
        query = cur.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
        cur.execute(query)
        self.conn.commit()
117 118
        return cur.fetchone()[0]

119
    def execution_delete(self, execution_id):
120
        """Delete an execution and its services from the state."""
121 122 123 124 125 126 127
        cur = self._cursor()
        query = "DELETE FROM service WHERE execution_id = %s"
        cur.execute(query, (execution_id,))
        query = "DELETE FROM execution WHERE id = %s"
        cur.execute(query, (execution_id,))
        self.conn.commit()

128
    def service_list(self, only_one=False, **kwargs):
129 130 131 132 133 134 135 136
        """
        Return a list of services.

        :param only_one: only one result is expected
        :type only_one: bool
        :param kwargs: filter services based on their fields/columns
        :return: one or more services
        """
137 138 139 140 141 142
        cur = self._cursor()
        q_base = 'SELECT * FROM service'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
143 144 145
            for key, value in kwargs.items():
                filter_list.append('{} = %s'.format(key))
                args_list.append(value)
146
            q += ' AND '.join(filter_list)
147 148 149 150 151 152
            query = cur.mogrify(q, args_list)
        else:
            query = cur.mogrify(q_base)

        cur.execute(query)
        if only_one:
153 154 155 156
            row = cur.fetchone()
            if row is None:
                return None
            return Service(row, self)
157 158 159 160
        else:
            return [Service(x, self) for x in cur]

    def service_update(self, service_id, **kwargs):
161
        """Update the state of an existing service."""
162 163 164
        cur = self._cursor()
        arg_list = []
        value_list = []
165 166 167
        for key, value in kwargs.items():
            arg_list.append('{} = %s'.format(key))
            value_list.append(value)
168 169
        set_q = ", ".join(arg_list)
        value_list.append(service_id)
170
        q_base = 'UPDATE service SET ' + set_q + ' WHERE id=%s'
171 172 173 174 175
        query = cur.mogrify(q_base, value_list)
        cur.execute(query)
        self.conn.commit()

    def service_new(self, execution_id, name, service_group, description):
176
        """Adds a new service to the state."""
177 178 179 180 181 182
        cur = self._cursor()
        status = 'created'
        query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description))
        cur.execute(query)
        self.conn.commit()
        return cur.fetchone()[0]
183 184 185 186 187 188 189 190 191 192 193 194 195 196


class Base:
    """
    :type sql_manager: SQLManager
    """
    def __init__(self, d, sql_manager):
        """
        :type sql_manager: SQLManager
        """
        self.sql_manager = sql_manager
        self.id = d['id']

    def serialize(self):
197
        """Generates a dictionary that can be serialized in JSON."""
198 199 200 201 202
        raise NotImplementedError


class Execution(Base):
    """
203 204
    A Zoe execution.

205 206 207 208
    :type time_submit: datetime.datetime
    :type time_start: datetime.datetime
    :type time_end: datetime.datetime
    """
209 210 211 212 213 214 215 216 217

    SUBMIT_STATUS = "submitted"
    SCHEDULED_STATUS = "scheduled"
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
    RUNNING_STATUS = "running"
    CLEANING_UP_STATUS = "cleaning up"
    TERMINATED_STATUS = "terminated"

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
    def __init__(self, d, sql_manager):
        super().__init__(d, sql_manager)

        self.user_id = d['user_id']
        self.name = d['name']
        self.description = d['description']

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_submit = d['time_submit']
        else:
            self.time_submit = datetime.datetime.fromtimestamp(d['time_submit'])

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_start = d['time_start']
        else:
            self.time_start = datetime.datetime.fromtimestamp(d['time_start'])

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_end = d['time_end']
        else:
            self.time_submit = datetime.datetime.fromtimestamp(d['time_start'])

240
        self._status = d['status']
241 242 243
        self.error_message = d['error_message']

    def serialize(self):
244
        """Generates a dictionary that can be serialized in JSON."""
245 246 247 248 249 250 251 252
        return {
            'id': self.id,
            'user_id': self.user_id,
            'name': self.name,
            'description': self.description,
            'time_submit': self.time_submit.timestamp(),
            'time_start': None if self.time_start is None else self.time_start.timestamp(),
            'time_end': None if self.time_end is None else self.time_end.timestamp(),
253
            'status': self._status,
254 255
            'error_message': self.error_message,
            'services': [s.id for s in self.services]
256 257
        }

258 259 260
    def __eq__(self, other):
        return self.id == other.id

261
    def set_scheduled(self):
262
        """The execution has been added to the scheduler queues."""
263 264
        self._status = self.SCHEDULED_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)
265

266
    def set_starting(self):
267
        """The services of the execution are being created in Swarm."""
268 269 270 271
        self._status = self.STARTING_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

    def set_running(self):
272
        """The execution is running and producing useful work."""
273
        self._status = self.RUNNING_STATUS
274
        self.time_start = datetime.datetime.now()
275
        self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
276 277

    def set_cleaning_up(self):
278
        """The services of the execution are being terminated."""
279 280
        self._status = self.CLEANING_UP_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)
281 282

    def set_terminated(self):
283
        """The execution is not running."""
284
        self._status = self.TERMINATED_STATUS
285
        self.time_end = datetime.datetime.now()
286
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
287

288
    def set_error(self):
289
        """The scheduler encountered an error starting or running the execution."""
290
        self._status = self.ERROR_STATUS
291
        self.time_end = datetime.datetime.now()
292 293 294
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)

    def set_error_message(self, message):
295
        """Contains an error message in case the status is 'error'."""
296 297
        self.error_message = message
        self.sql_manager.execution_update(self.id, error_message=self.error_message)
298 299

    def is_active(self):
300 301 302 303 304
        """
        Returns True if the execution is in the scheduler
        :return:
        """
        return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
305

306 307
    @property
    def status(self):
308
        """Getter for the execution status."""
309 310
        return self._status

311 312
    @property
    def services(self):
313
        """Getter for this execution service list."""
314 315 316 317
        return self.sql_manager.service_list(execution_id=self.id)


class Service(Base):
318
    """A Zoe Service."""
319 320 321 322 323

    TERMINATING_STATUS = "terminating"
    INACTIVE_STATUS = "inactive"
    ACTIVE_STATUS = "active"
    STARTING_STATUS = "starting"
324
    ERROR_STATUS = "error"
325

326
    DOCKER_UNDEFINED_STATUS = 'undefined'
327 328 329 330
    DOCKER_CREATE_STATUS = 'created'
    DOCKER_START_STATUS = 'started'
    DOCKER_DIE_STATUS = 'dead'
    DOCKER_DESTROY_STATUS = 'destroyed'
331
    DOCKER_OOM_STATUS = 'oom-killed'
332

333 334 335 336 337 338 339 340 341 342
    def __init__(self, d, sql_manager):
        super().__init__(d, sql_manager)

        self.name = d['name']
        self.status = d['status']
        self.error_message = d['error_message']
        self.execution_id = d['execution_id']
        self.description = d['description']
        self.service_group = d['service_group']
        self.docker_id = d['docker_id']
343
        self.docker_status = d['docker_status']
344 345

    def serialize(self):
346
        """Generates a dictionary that can be serialized in JSON."""
347
        return {
348
            'id': self.id,
349 350 351 352 353 354
            'name': self.name,
            'status': self.status,
            'error_message': self.error_message,
            'execution_id': self.execution_id,
            'description': self.description,
            'service_group': self.service_group,
355
            'docker_id': self.docker_id,
356 357
            'ip_address': self.ip_address,
            'docker_status': self.docker_status
358 359
        }

360 361 362
    def __eq__(self, other):
        return self.id == other.id

363 364
    @property
    def dns_name(self):
365
        """Getter for the DNS name of this service as it will be registered in Docker's DNS."""
366 367 368
        return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)

    def set_terminating(self):
369
        """The service is being killed."""
370
        self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
371
        self.status = self.TERMINATING_STATUS
372 373

    def set_inactive(self):
374
        """The service is not running."""
375
        self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, docker_id=None)
376
        self.status = self.INACTIVE_STATUS
377 378

    def set_starting(self):
379
        """The service is being created by Docker."""
380
        self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
381
        self.status = self.STARTING_STATUS
382 383

    def set_active(self, docker_id):
384
        """The service is running and has a valid docker_id."""
385 386
        self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id, error_message=None)
        self.error_message = None
387
        self.status = self.ACTIVE_STATUS
388 389 390 391

    def set_error(self, error_message):
        """The service could not be created/started."""
        self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
392 393
        self.status = self.ERROR_STATUS
        self.error_message = error_message
394

395 396 397
    def set_docker_status(self, new_status):
        """Docker has emitted an event related to this service."""
        self.sql_manager.service_update(self.id, docker_status=new_status)
398
        log.debug("service {}, status updated to {}".format(self.id, new_status))
399
        self.docker_status = new_status
400

401 402
    @property
    def ip_address(self):
403
        """Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
404
        if self.docker_status != self.DOCKER_START_STATUS:
405 406 407 408
            return {}
        swarm = SwarmClient(get_conf())
        s_info = swarm.inspect_container(self.docker_id)
        return s_info['ip_address'][get_conf().overlay_network_name]
409 410 411 412 413 414

    @property
    def user_id(self):
        """Getter for the user_id, that is actually taken form the parent execution."""
        execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
        return execution.user_id