sql_manager.py 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16 17
"""Interface to PostgresQL for Zoe state."""

18
import datetime
Daniele Venzano's avatar
Daniele Venzano committed
19
import logging
20 21 22

import psycopg2
import psycopg2.extras
23

24 25
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
26

Daniele Venzano's avatar
Daniele Venzano committed
27 28
log = logging.getLogger(__name__)

29
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
30 31 32


class SQLManager:
33
    """The SQLManager class, should be used as a singleton."""
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    def __init__(self, conf):
        self.user = conf.dbuser
        self.password = conf.dbpass
        self.host = conf.dbhost
        self.port = conf.dbport
        self.dbname = conf.dbname
        self.schema = conf.deployment_name
        self.conn = None
        self._connect()

    def _connect(self):
        dsn = 'dbname=' + self.dbname + \
              ' user=' + self.user + \
              ' password=' + self.password + \
              ' host=' + self.host + \
              ' port=' + str(self.port)

        self.conn = psycopg2.connect(dsn)

    def _cursor(self):
54 55 56 57 58
        try:
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        except psycopg2.InterfaceError:
            self._connect()
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
59 60 61 62
        cur.execute('SET search_path TO {},public'.format(self.schema))
        return cur

    def execution_list(self, only_one=False, **kwargs):
63 64 65 66 67 68 69 70
        """
        Return a list of executions.

        :param only_one: only one result is expected
        :type only_one: bool
        :param kwargs: filter executions based on their fields/columns
        :return: one or more executions
        """
71 72 73 74 75 76
        cur = self._cursor()
        q_base = 'SELECT * FROM execution'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
77 78 79
            for key, value in kwargs.items():
                filter_list.append('{} = %s'.format(key))
                args_list.append(value)
80
            q += ' AND '.join(filter_list)
81 82 83 84 85 86
            query = cur.mogrify(q, args_list)
        else:
            query = cur.mogrify(q_base)

        cur.execute(query)
        if only_one:
87 88 89 90
            row = cur.fetchone()
            if row is None:
                return None
            return Execution(row, self)
91 92 93 94
        else:
            return [Execution(x, self) for x in cur]

    def execution_update(self, exec_id, **kwargs):
95
        """Update the state of an execution."""
96 97 98
        cur = self._cursor()
        arg_list = []
        value_list = []
99 100 101
        for key, value in kwargs.items():
            arg_list.append('{} = %s'.format(key))
            value_list.append(value)
102 103
        set_q = ", ".join(arg_list)
        value_list.append(exec_id)
104
        q_base = 'UPDATE execution SET ' + set_q + ' WHERE id=%s'
105 106 107 108 109
        query = cur.mogrify(q_base, value_list)
        cur.execute(query)
        self.conn.commit()

    def execution_new(self, name, user_id, description):
110
        """Create a new execution in the state."""
111
        cur = self._cursor()
112
        status = Execution.SUBMIT_STATUS
113 114 115 116
        time_submit = datetime.datetime.now()
        query = cur.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
        cur.execute(query)
        self.conn.commit()
117 118
        return cur.fetchone()[0]

119
    def execution_delete(self, execution_id):
120
        """Delete an execution and its services from the state."""
121 122 123 124 125 126 127
        cur = self._cursor()
        query = "DELETE FROM service WHERE execution_id = %s"
        cur.execute(query, (execution_id,))
        query = "DELETE FROM execution WHERE id = %s"
        cur.execute(query, (execution_id,))
        self.conn.commit()

128
    def service_list(self, only_one=False, **kwargs):
129 130 131 132 133 134 135 136
        """
        Return a list of services.

        :param only_one: only one result is expected
        :type only_one: bool
        :param kwargs: filter services based on their fields/columns
        :return: one or more services
        """
137 138 139 140 141 142
        cur = self._cursor()
        q_base = 'SELECT * FROM service'
        if len(kwargs) > 0:
            q = q_base + " WHERE "
            filter_list = []
            args_list = []
143 144 145
            for key, value in kwargs.items():
                filter_list.append('{} = %s'.format(key))
                args_list.append(value)
146
            q += ' AND '.join(filter_list)
147 148 149 150 151 152
            query = cur.mogrify(q, args_list)
        else:
            query = cur.mogrify(q_base)

        cur.execute(query)
        if only_one:
153 154 155 156
            row = cur.fetchone()
            if row is None:
                return None
            return Service(row, self)
157 158 159 160
        else:
            return [Service(x, self) for x in cur]

    def service_update(self, service_id, **kwargs):
161
        """Update the state of an existing service."""
162 163 164
        cur = self._cursor()
        arg_list = []
        value_list = []
165 166 167
        for key, value in kwargs.items():
            arg_list.append('{} = %s'.format(key))
            value_list.append(value)
168 169
        set_q = ", ".join(arg_list)
        value_list.append(service_id)
170
        q_base = 'UPDATE service SET ' + set_q + ' WHERE id=%s'
171 172 173 174 175
        query = cur.mogrify(q_base, value_list)
        cur.execute(query)
        self.conn.commit()

    def service_new(self, execution_id, name, service_group, description):
176
        """Adds a new service to the state."""
177 178 179 180 181 182
        cur = self._cursor()
        status = 'created'
        query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description))
        cur.execute(query)
        self.conn.commit()
        return cur.fetchone()[0]
183 184 185 186 187 188 189 190 191 192 193 194 195 196


class Base:
    """
    :type sql_manager: SQLManager
    """
    def __init__(self, d, sql_manager):
        """
        :type sql_manager: SQLManager
        """
        self.sql_manager = sql_manager
        self.id = d['id']

    def serialize(self):
197
        """Generates a dictionary that can be serialized in JSON."""
198 199 200 201 202
        raise NotImplementedError


class Execution(Base):
    """
203 204
    A Zoe execution.

205 206 207 208
    :type time_submit: datetime.datetime
    :type time_start: datetime.datetime
    :type time_end: datetime.datetime
    """
209 210 211 212 213 214 215 216 217

    SUBMIT_STATUS = "submitted"
    SCHEDULED_STATUS = "scheduled"
    STARTING_STATUS = "starting"
    ERROR_STATUS = "error"
    RUNNING_STATUS = "running"
    CLEANING_UP_STATUS = "cleaning up"
    TERMINATED_STATUS = "terminated"

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
    def __init__(self, d, sql_manager):
        super().__init__(d, sql_manager)

        self.user_id = d['user_id']
        self.name = d['name']
        self.description = d['description']

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_submit = d['time_submit']
        else:
            self.time_submit = datetime.datetime.fromtimestamp(d['time_submit'])

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_start = d['time_start']
        else:
            self.time_start = datetime.datetime.fromtimestamp(d['time_start'])

        if isinstance(d['time_submit'], datetime.datetime):
            self.time_end = d['time_end']
        else:
            self.time_submit = datetime.datetime.fromtimestamp(d['time_start'])

240
        self._status = d['status']
241 242 243
        self.error_message = d['error_message']

    def serialize(self):
244
        """Generates a dictionary that can be serialized in JSON."""
245 246 247 248 249 250 251 252
        return {
            'id': self.id,
            'user_id': self.user_id,
            'name': self.name,
            'description': self.description,
            'time_submit': self.time_submit.timestamp(),
            'time_start': None if self.time_start is None else self.time_start.timestamp(),
            'time_end': None if self.time_end is None else self.time_end.timestamp(),
253
            'status': self._status,
254 255
            'error_message': self.error_message,
            'services': [s.id for s in self.services]
256 257
        }

258 259 260
    def __eq__(self, other):
        return self.id == other.id

261
    def set_scheduled(self):
262
        """The execution has been added to the scheduler queues."""
263 264
        self._status = self.SCHEDULED_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)
265

266
    def set_starting(self):
267
        """The services of the execution are being created in Swarm."""
268 269 270 271
        self._status = self.STARTING_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)

    def set_running(self):
272
        """The execution is running and producing useful work."""
273
        self._status = self.RUNNING_STATUS
274
        self.time_start = datetime.datetime.now()
275
        self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
276 277

    def set_cleaning_up(self):
278
        """The services of the execution are being terminated."""
279 280
        self._status = self.CLEANING_UP_STATUS
        self.sql_manager.execution_update(self.id, status=self._status)
281 282

    def set_terminated(self):
283
        """The execution is not running."""
284
        self._status = self.TERMINATED_STATUS
285
        self.time_end = datetime.datetime.now()
286
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
287

288
    def set_error(self):
289
        """The scheduler encountered an error starting or running the execution."""
290
        self._status = self.ERROR_STATUS
291
        self.time_end = datetime.datetime.now()
292 293 294
        self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)

    def set_error_message(self, message):
295
        """Contains an error message in case the status is 'error'."""
296 297
        self.error_message = message
        self.sql_manager.execution_update(self.id, error_message=self.error_message)
298 299

    def is_active(self):
300 301 302 303 304
        """
        Returns True if the execution is in the scheduler
        :return:
        """
        return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
305

306 307
    @property
    def status(self):
308
        """Getter for the execution status."""
309 310
        return self._status

311 312
    @property
    def services(self):
313
        """Getter for this execution service list."""
314 315 316 317
        return self.sql_manager.service_list(execution_id=self.id)


class Service(Base):
318
    """A Zoe Service."""
319 320 321 322 323

    TERMINATING_STATUS = "terminating"
    INACTIVE_STATUS = "inactive"
    ACTIVE_STATUS = "active"
    STARTING_STATUS = "starting"
324
    ERROR_STATUS = "error"
325

Daniele Venzano's avatar
Daniele Venzano committed
326
    DOCKER_UNDEFINED_STATUS = 'undefined'
327 328 329 330
    DOCKER_CREATE_STATUS = 'created'
    DOCKER_START_STATUS = 'started'
    DOCKER_DIE_STATUS = 'dead'
    DOCKER_DESTROY_STATUS = 'destroyed'
331
    DOCKER_OOM_STATUS = 'oom-killed'
Daniele Venzano's avatar
Daniele Venzano committed
332

333 334 335 336 337 338 339 340 341 342
    def __init__(self, d, sql_manager):
        super().__init__(d, sql_manager)

        self.name = d['name']
        self.status = d['status']
        self.error_message = d['error_message']
        self.execution_id = d['execution_id']
        self.description = d['description']
        self.service_group = d['service_group']
        self.docker_id = d['docker_id']
Daniele Venzano's avatar
Daniele Venzano committed
343
        self.docker_status = d['docker_status']
344 345

    def serialize(self):
346
        """Generates a dictionary that can be serialized in JSON."""
347
        return {
348
            'id': self.id,
349 350 351 352 353 354
            'name': self.name,
            'status': self.status,
            'error_message': self.error_message,
            'execution_id': self.execution_id,
            'description': self.description,
            'service_group': self.service_group,
355
            'docker_id': self.docker_id,
Daniele Venzano's avatar
Daniele Venzano committed
356 357
            'ip_address': self.ip_address,
            'docker_status': self.docker_status
358 359
        }

360 361 362
    def __eq__(self, other):
        return self.id == other.id

363 364
    @property
    def dns_name(self):
365
        """Getter for the DNS name of this service as it will be registered in Docker's DNS."""
366 367 368
        return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)

    def set_terminating(self):
369
        """The service is being killed."""
370
        self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
371
        self.status = self.TERMINATING_STATUS
372 373

    def set_inactive(self):
374
        """The service is not running."""
375
        self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, docker_id=None)
376
        self.status = self.INACTIVE_STATUS
377 378

    def set_starting(self):
379
        """The service is being created by Docker."""
380
        self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
381
        self.status = self.STARTING_STATUS
382 383

    def set_active(self, docker_id):
384
        """The service is running and has a valid docker_id."""
385 386
        self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id, error_message=None)
        self.error_message = None
387
        self.status = self.ACTIVE_STATUS
388 389 390 391

    def set_error(self, error_message):
        """The service could not be created/started."""
        self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
392 393
        self.status = self.ERROR_STATUS
        self.error_message = error_message
394

Daniele Venzano's avatar
Daniele Venzano committed
395 396 397
    def set_docker_status(self, new_status):
        """Docker has emitted an event related to this service."""
        self.sql_manager.service_update(self.id, docker_status=new_status)
398
        log.debug("service {}, status updated to {}".format(self.id, new_status))
399
        self.docker_status = new_status
Daniele Venzano's avatar
Daniele Venzano committed
400

401 402
    @property
    def ip_address(self):
403
        """Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
404
        if self.docker_status != self.DOCKER_START_STATUS:
405 406 407 408
            return {}
        swarm = SwarmClient(get_conf())
        s_info = swarm.inspect_container(self.docker_id)
        return s_info['ip_address'][get_conf().overlay_network_name]
409 410 411 412 413 414

    @property
    def user_id(self):
        """Getter for the user_id, that is actually taken form the parent execution."""
        execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
        return execution.user_id