Commit d92e83ee authored by Daniele Venzano's avatar Daniele Venzano

Refactor state classes in their own package/files

parent 4006c3ad
......@@ -25,7 +25,7 @@ import time
import zoe_lib.applications
import zoe_lib.config as config
from zoe_lib.configargparse import ArgumentParser, FileType
from zoe_lib.sql_manager import Execution, Service
from zoe_lib.state.sql_manager import Execution, Service
from zoe_lib.swarm_client import SwarmClient
from zoe_master.execution_manager import _digest_application_description
from zoe_master.zapp_to_docker import execution_to_containers, terminate_execution
......
......@@ -18,15 +18,14 @@
import logging
import re
from zoe_lib.config import get_conf
import zoe_lib.sql_manager
import zoe_api.exceptions
import zoe_api.master_api
import zoe_lib.applications
import zoe_lib.exceptions
import zoe_lib.state
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
import zoe_api.master_api
import zoe_api.exceptions
log = logging.getLogger(__name__)
......@@ -39,14 +38,14 @@ class APIEndpoint:
"""
def __init__(self):
self.master = zoe_api.master_api.APIManager()
self.sql = zoe_lib.sql_manager.SQLManager(get_conf())
self.sql = zoe_lib.state.SQLManager(get_conf())
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.sql_manager.Execution:
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
"""Lookup an execution by its ID."""
e = self.sql.execution_list(id=execution_id, only_one=True)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
assert isinstance(e, zoe_lib.sql_manager.Execution)
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
if e.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
return e
......@@ -78,7 +77,7 @@ class APIEndpoint:
def execution_terminate(self, uid, role, exec_id):
"""Terminate an execution."""
e = self.sql.execution_list(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.sql_manager.Execution)
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
......@@ -93,7 +92,7 @@ class APIEndpoint:
def execution_delete(self, uid, role, exec_id):
"""Delete an execution."""
e = self.sql.execution_list(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.sql_manager.Execution)
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
......@@ -110,7 +109,7 @@ class APIEndpoint:
else:
raise zoe_api.exceptions.ZoeException(message)
def service_by_id(self, uid, role, service_id) -> zoe_lib.sql_manager.Service:
def service_by_id(self, uid, role, service_id) -> zoe_lib.state.sql_manager.Service:
"""Lookup a service by its ID."""
service = self.sql.service_list(id=service_id, only_one=True)
if service is None:
......@@ -145,7 +144,7 @@ class APIEndpoint:
def retry_submit_error_executions(self):
"""Resubmit any execution forgotten by the master."""
waiting_execs = self.sql.execution_list(status=zoe_lib.sql_manager.Execution.SUBMIT_STATUS)
waiting_execs = self.sql.execution_list(status=zoe_lib.state.sql_manager.Execution.SUBMIT_STATUS)
if waiting_execs is None or len(waiting_execs) == 0:
return
e = waiting_execs[0]
......
......@@ -19,9 +19,9 @@ import logging
import os
import shutil
from zoe_lib.sql_manager import Execution
from zoe_lib.swarm_client import SwarmClient
from zoe_lib.config import get_conf
from zoe_lib.state import Execution
from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
......
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe state management and database classes"""
from zoe_lib.state.base import Base
from zoe_lib.state.execution import Execution
from zoe_lib.state.sql_manager import SQLManager
from zoe_lib.state.service import Service
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to PostgresQL for Zoe state."""
import logging
log = logging.getLogger(__name__)
class Base:
"""
:type sql_manager: SQLManager
"""
def __init__(self, d, sql_manager):
"""
:type sql_manager: SQLManager
"""
self.sql_manager = sql_manager
self.id = d['id']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
raise NotImplementedError
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to PostgresQL for Zoe state."""
import datetime
import logging
log = logging.getLogger(__name__)
class Execution:
"""
A Zoe execution.
:type time_submit: datetime.datetime
:type time_start: datetime.datetime
:type time_end: datetime.datetime
"""
SUBMIT_STATUS = "submitted"
SCHEDULED_STATUS = "scheduled"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
RUNNING_STATUS = "running"
CLEANING_UP_STATUS = "cleaning up"
TERMINATED_STATUS = "terminated"
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
self.user_id = d['user_id']
self.name = d['name']
self.description = d['description']
if isinstance(d['time_submit'], datetime.datetime):
self.time_submit = d['time_submit']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_submit'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_start = d['time_start']
else:
self.time_start = datetime.datetime.fromtimestamp(d['time_start'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_end = d['time_end']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_start'])
self._status = d['status']
self.error_message = d['error_message']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'user_id': self.user_id,
'name': self.name,
'description': self.description,
'time_submit': self.time_submit.timestamp(),
'time_start': None if self.time_start is None else self.time_start.timestamp(),
'time_end': None if self.time_end is None else self.time_end.timestamp(),
'status': self._status,
'error_message': self.error_message,
'services': [s.id for s in self.services]
}
def __eq__(self, other):
return self.id == other.id
def set_scheduled(self):
"""The execution has been added to the scheduler queues."""
self._status = self.SCHEDULED_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_starting(self):
"""The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_running(self):
"""The execution is running and producing useful work."""
self._status = self.RUNNING_STATUS
self.time_start = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
def set_cleaning_up(self):
"""The services of the execution are being terminated."""
self._status = self.CLEANING_UP_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_terminated(self):
"""The execution is not running."""
self._status = self.TERMINATED_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error(self):
"""The scheduler encountered an error starting or running the execution."""
self._status = self.ERROR_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error_message(self, message):
"""Contains an error message in case the status is 'error'."""
self.error_message = message
self.sql_manager.execution_update(self.id, error_message=self.error_message)
def is_active(self):
"""
Returns True if the execution is in the scheduler
:return:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
@property
def status(self):
"""Getter for the execution status."""
return self._status
@property
def services(self):
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to PostgresQL for Zoe state."""
import logging
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
class Service:
"""A Zoe Service."""
TERMINATING_STATUS = "terminating"
INACTIVE_STATUS = "inactive"
ACTIVE_STATUS = "active"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
DOCKER_UNDEFINED_STATUS = 'undefined'
DOCKER_CREATE_STATUS = 'created'
DOCKER_START_STATUS = 'started'
DOCKER_DIE_STATUS = 'dead'
DOCKER_DESTROY_STATUS = 'destroyed'
DOCKER_OOM_STATUS = 'oom-killed'
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
self.name = d['name']
self.status = d['status']
self.error_message = d['error_message']
self.execution_id = d['execution_id']
self.description = d['description']
self.service_group = d['service_group']
self.docker_id = d['docker_id']
self.docker_status = d['docker_status']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'name': self.name,
'status': self.status,
'error_message': self.error_message,
'execution_id': self.execution_id,
'description': self.description,
'service_group': self.service_group,
'docker_id': self.docker_id,
'ip_address': self.ip_address,
'docker_status': self.docker_status
}
def __eq__(self, other):
return self.id == other.id
@property
def dns_name(self):
"""Getter for the DNS name of this service as it will be registered in Docker's DNS."""
return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)
def set_terminating(self):
"""The service is being killed."""
self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
self.status = self.TERMINATING_STATUS
def set_inactive(self):
"""The service is not running."""
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, docker_id=None)
self.status = self.INACTIVE_STATUS
def set_starting(self):
"""The service is being created by Docker."""
self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
self.status = self.STARTING_STATUS
def set_active(self, docker_id):
"""The service is running and has a valid docker_id."""
self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id, error_message=None)
self.error_message = None
self.status = self.ACTIVE_STATUS
def set_error(self, error_message):
"""The service could not be created/started."""
self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
self.status = self.ERROR_STATUS
self.error_message = error_message
def set_docker_status(self, new_status):
"""Docker has emitted an event related to this service."""
self.sql_manager.service_update(self.id, docker_status=new_status)
log.debug("service {}, status updated to {}".format(self.id, new_status))
self.docker_status = new_status
@property
def ip_address(self):
"""Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
if self.docker_status != self.DOCKER_START_STATUS:
return {}
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(self.docker_id)
return s_info['ip_address'][get_conf().overlay_network_name]
@property
def user_id(self):
"""Getter for the user_id, that is actually taken form the parent execution."""
execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
return execution.user_id
# Copyright (c) 2016, Daniele Venzano
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -21,8 +21,8 @@ import logging
import psycopg2
import psycopg2.extras
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
from .service import Service
from .execution import Execution
log = logging.getLogger(__name__)
......@@ -180,235 +180,3 @@ class SQLManager:
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
class Base:
"""
:type sql_manager: SQLManager
"""
def __init__(self, d, sql_manager):
"""
:type sql_manager: SQLManager
"""
self.sql_manager = sql_manager
self.id = d['id']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
raise NotImplementedError
class Execution(Base):
"""
A Zoe execution.
:type time_submit: datetime.datetime
:type time_start: datetime.datetime
:type time_end: datetime.datetime
"""
SUBMIT_STATUS = "submitted"
SCHEDULED_STATUS = "scheduled"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
RUNNING_STATUS = "running"
CLEANING_UP_STATUS = "cleaning up"
TERMINATED_STATUS = "terminated"
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
self.user_id = d['user_id']
self.name = d['name']
self.description = d['description']
if isinstance(d['time_submit'], datetime.datetime):
self.time_submit = d['time_submit']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_submit'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_start = d['time_start']
else:
self.time_start = datetime.datetime.fromtimestamp(d['time_start'])
if isinstance(d['time_submit'], datetime.datetime):
self.time_end = d['time_end']
else:
self.time_submit = datetime.datetime.fromtimestamp(d['time_start'])
self._status = d['status']
self.error_message = d['error_message']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'user_id': self.user_id,
'name': self.name,
'description': self.description,
'time_submit': self.time_submit.timestamp(),
'time_start': None if self.time_start is None else self.time_start.timestamp(),
'time_end': None if self.time_end is None else self.time_end.timestamp(),
'status': self._status,
'error_message': self.error_message,
'services': [s.id for s in self.services]
}
def __eq__(self, other):
return self.id == other.id
def set_scheduled(self):
"""The execution has been added to the scheduler queues."""
self._status = self.SCHEDULED_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_starting(self):
"""The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_running(self):
"""The execution is running and producing useful work."""
self._status = self.RUNNING_STATUS
self.time_start = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
def set_cleaning_up(self):
"""The services of the execution are being terminated."""
self._status = self.CLEANING_UP_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
def set_terminated(self):
"""The execution is not running."""
self._status = self.TERMINATED_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error(self):
"""The scheduler encountered an error starting or running the execution."""
self._status = self.ERROR_STATUS
self.time_end = datetime.datetime.now()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
def set_error_message(self, message):
"""Contains an error message in case the status is 'error'."""
self.error_message = message
self.sql_manager.execution_update(self.id, error_message=self.error_message)
def is_active(self):
"""
Returns True if the execution is in the scheduler
:return:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
@property
def status(self):
"""Getter for the execution status."""
return self._status
@property
def services(self):
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
class Service(Base):
"""A Zoe Service."""
TERMINATING_STATUS = "terminating"
INACTIVE_STATUS = "inactive"
ACTIVE_STATUS = "active"
STARTING_STATUS = "starting"
ERROR_STATUS = "error"
DOCKER_UNDEFINED_STATUS = 'undefined'
DOCKER_CREATE_STATUS = 'created'
DOCKER_START_STATUS = 'started'
DOCKER_DIE_STATUS = 'dead'
DOCKER_DESTROY_STATUS = 'destroyed'
DOCKER_OOM_STATUS = 'oom-killed'
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
self.name = d['name']
self.status = d['status']
self.error_message = d['error_message']
self.execution_id = d['execution_id']
self.description = d['description']
self.service_group = d['service_group']
self.docker_id = d['docker_id']
self.docker_status = d['docker_status']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
'id': self.id,
'name': self.name,
'status': self.status,
'error_message': self.error_message,
'execution_id': self.execution_id,
'description': self.description,
'service_group': self.service_group,
'docker_id': self.docker_id,
'ip_address': self.ip_address,
'docker_status': self.docker_status
}
def __eq__(self, other):
return self.id == other.id
@property
def dns_name(self):
"""Getter for the DNS name of this service as it will be registered in Docker's DNS."""
return "{}-{}-{}".format(self.name, self.execution_id, get_conf().deployment_name)
def set_terminating(self):
"""The service is being killed."""
self.sql_manager.service_update(self.id, status=self.TERMINATING_STATUS)
self.status = self.TERMINATING_STATUS
def set_inactive(self):
"""The service is not running."""
self.sql_manager.service_update(self.id, status=self.INACTIVE_STATUS, docker_id=None)
self.status = self.INACTIVE_STATUS
def set_starting(self):
"""The service is being created by Docker."""
self.sql_manager.service_update(self.id, status=self.STARTING_STATUS)
self.status = self.STARTING_STATUS
def set_active(self, docker_id):
"""The service is running and has a valid docker_id."""
self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id, error_message=None)
self.error_message = None
self.status = self.ACTIVE_STATUS
def set_error(self, error_message):
"""The service could not be created/started."""
self.sql_manager.service_update(self.id, status=self.ERROR_STATUS, error_message=error_message)
self.status = self.ERROR_STATUS
self.error_message = error_message
def set_docker_status(self, new_status):
"""Docker has emitted an event related to this service."""
self.sql_manager.service_update(self.id, docker_status=new_status)
log.debug("service {}, status updated to {}".format(self.id, new_status))
self.docker_status = new_status
@property
def ip_address(self):
"""Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
if self.docker_status != self.DOCKER_START_STATUS:
return {}
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(self.docker_id)
return s_info['ip_address'][get_conf().overlay_network_name]
@property
def user_id(self):
"""Getter for the user_id, that is actually taken form the parent execution."""
execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
return execution.user_id
......@@ -19,9 +19,9 @@ import logging
import threading
import time
from zoe_lib.swarm_client import SwarmClient
from zoe_lib.config import get_conf
from zoe_lib.sql_manager import SQLManager, Service
from zoe_lib.state import SQLManager, Service
from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
......
......@@ -19,16 +19,15 @@
import logging
from zoe_master.master_api import APIManager
import zoe_master.scheduler
from zoe_master.execution_manager import restart_resubmit_scheduler
from zoe_master.monitor import ZoeMonitor
from zoe_master.consistency import ZoeSwarmChecker
import zoe_lib.config as config
import zoe_master.scheduler
from zoe_lib.metrics.influxdb import InfluxDBMetricSender
from zoe_lib.metrics.logging import LogMetricSender
from zoe_lib.sql_manager import SQLManager
from zoe_lib.state import SQLManager
from zoe_master.consistency import ZoeSwarmChecker
from zoe_master.execution_manager import restart_resubmit_scheduler
from zoe_master.master_api import APIManager
from zoe_master.monitor import ZoeMonitor
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
......
......@@ -17,9 +17,8 @@
import logging
from zoe_lib.sql_manager import Execution, SQLManager
from zoe_lib import exec_logs
from zoe_lib.state import Execution, SQLManager
from zoe_master.scheduler import ZoeBaseScheduler
log = logging.getLogger(__name__)
......
......@@ -21,10 +21,9 @@ import time
import zmq