Commit 70034ec7 authored by Daniele Venzano's avatar Daniele Venzano

Refactor SQL manager, node and service statistics

parent ab35cee8
......@@ -18,7 +18,7 @@
import time
import zoe_lib.config as config
import zoe_api.db_init
import zoe_lib.state.sql_manager
config.load_configuration()
......@@ -26,4 +26,5 @@ print("Warning, this script will delete the database tables for the deployment '
print("Sleeping 5 seconds before continuing, hit CTRL-C to stop and think.")
time.sleep(5)
zoe_api.db_init.init(force=True)
state = zoe_lib.state.sql_manager.SQLManager(config.get_conf())
state.init_db(force=True)
......@@ -37,23 +37,23 @@ class APIEndpoint:
:type master: zoe_api.master_api.APIManager
:type sql: zoe_lib.sql_manager.SQLManager
"""
def __init__(self, master_api, sql_manager):
def __init__(self, master_api, sql_manager: zoe_lib.state.SQLManager):
self.master = master_api
self.sql = sql_manager
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.sql_manager.Execution:
def execution_by_id(self, uid, role, execution_id) -> zoe_lib.state.Execution:
"""Lookup an execution by its ID."""
e = self.sql.execution_list(id=execution_id, only_one=True)
e = self.sql.executions.select(id=execution_id, only_one=True)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
assert isinstance(e, zoe_lib.state.Execution)
if e.user_id != uid and role != 'admin':
raise zoe_api.exceptions.ZoeAuthException()
return e
def execution_list(self, uid, role, **filters):
"""Generate a optionally filtered list of executions."""
execs = self.sql.execution_list(**filters)
execs = self.sql.executions.select(**filters)
ret = [e for e in execs if e.user_id == uid or role == 'admin']
return ret
......@@ -64,7 +64,7 @@ class APIEndpoint:
except zoe_lib.exceptions.InvalidApplicationDescription as e:
raise zoe_api.exceptions.ZoeException('Invalid application description: ' + e.message)
def execution_start(self, uid, role, exec_name, application_description): # pylint: disable=unused-argument
def execution_start(self, uid, role, exec_name, application_description): # pylint: disable=unused-argument
"""Start an execution."""
try:
zoe_lib.applications.app_validate(application_description)
......@@ -81,7 +81,7 @@ class APIEndpoint:
if len(running_execs) >= GUEST_QUOTA_MAX_EXECUTIONS:
raise zoe_api.exceptions.ZoeException('Guest users cannot run more than one execution at a time, quota exceeded.')
new_id = self.sql.execution_new(exec_name, uid, application_description)
new_id = self.sql.executions.insert(exec_name, uid, application_description)
success, message = self.master.execution_start(new_id)
if not success:
raise zoe_api.exceptions.ZoeException('The Zoe master is unavailable, execution will be submitted automatically when the master is back up ({}).'.format(message))
......@@ -90,8 +90,8 @@ class APIEndpoint:
def execution_terminate(self, uid, role, exec_id):
"""Terminate an execution."""
e = self.sql.execution_list(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
e = self.sql.executions.select(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.Execution)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
......@@ -108,8 +108,8 @@ class APIEndpoint:
if role != "admin":
raise zoe_api.exceptions.ZoeAuthException()
e = self.sql.execution_list(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.sql_manager.Execution)
e = self.sql.executions.select(id=exec_id, only_one=True)
assert isinstance(e, zoe_lib.state.Execution)
if e is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
......@@ -121,14 +121,14 @@ class APIEndpoint:
status, message = self.master.execution_delete(exec_id)
if status:
self.sql.execution_delete(exec_id)
self.sql.executions.delete(exec_id)
return True, ''
else:
raise zoe_api.exceptions.ZoeException(message)
def service_by_id(self, uid, role, service_id) -> zoe_lib.state.sql_manager.Service:
def service_by_id(self, uid, role, service_id) -> zoe_lib.state.Service:
"""Lookup a service by its ID."""
service = self.sql.service_list(id=service_id, only_one=True)
service = self.sql.services.select(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such execution')
if service.user_id != uid and role != 'admin':
......@@ -137,7 +137,7 @@ class APIEndpoint:
def service_list(self, uid, role, **filters):
"""Generate a optionally filtered list of services."""
services = self.sql.service_list(**filters)
services = self.sql.services.select(**filters)
ret = [s for s in services if s.user_id == uid or role == 'admin']
return ret
......@@ -145,7 +145,7 @@ class APIEndpoint:
"""Retrieve the logs for the given service.
If stream is True, a file object is returned, otherwise the log contents as a str object.
"""
service = self.sql.service_list(id=service_id, only_one=True)
service = self.sql.services.select(id=service_id, only_one=True)
if service is None:
raise zoe_api.exceptions.ZoeNotFoundException('No such service')
if service.user_id != uid and role != 'admin':
......@@ -165,7 +165,7 @@ class APIEndpoint:
def cleanup_dead_executions(self):
"""Terminates all executions with dead "monitor" services."""
log.debug('Starting dead execution cleanup task')
all_execs = self.sql.execution_list(status='running')
all_execs = self.sql.executions.select(status='running')
for execution in all_execs:
for service in execution.services:
if service.description['monitor'] and service.backend_status == service.BACKEND_DIE_STATUS:
......@@ -183,7 +183,7 @@ class APIEndpoint:
services_info.append(self.service_by_id(uid, role, service.id))
for port in service.description['ports']:
port_key = str(port['port_number']) + "/" + port['protocol']
backend_port = self.sql.port_list(only_one=True, service_id=service.id, internal_name=port_key)
backend_port = self.sql.ports.select(only_one=True, service_id=service.id, internal_name=port_key)
if backend_port is not None and backend_port.external_ip is not None:
endpoint = port['url_template'].format(**{"ip_port": backend_port.external_ip + ":" + str(backend_port.external_port)})
endpoints.append((port['name'], endpoint))
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Database initialization."""
import psycopg2
import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
"""Create the version table."""
cur.execute("CREATE TABLE IF NOT EXISTS public.versions (deployment text, version integer)")
def schema(cur, deployment_name):
"""Create the schema for the configured deployment name."""
cur.execute("SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = %s)", (deployment_name,))
if not cur.fetchone()[0]:
cur.execute('CREATE SCHEMA {}'.format(deployment_name))
def check_schema_version(cur, deployment_name):
"""Check if the schema version matches this source code version."""
cur.execute("SELECT version FROM public.versions WHERE deployment = %s", (deployment_name,))
row = cur.fetchone()
if row is None:
cur.execute("INSERT INTO public.versions (deployment, version) VALUES (%s, %s)", (deployment_name, SQL_SCHEMA_VERSION))
schema(cur, deployment_name)
return False # Tables need to be created
else:
if row[0] == SQL_SCHEMA_VERSION:
return True
else:
raise zoe_api.exceptions.ZoeException('SQL database schema version mismatch: need {}, found {}'.format(SQL_SCHEMA_VERSION, row[0]))
def create_tables(cur):
"""Create the Zoe database tables."""
cur.execute('''CREATE TABLE execution (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
user_id TEXT NOT NULL,
description JSON NOT NULL,
status TEXT NOT NULL,
execution_manager_id TEXT NULL,
time_submit TIMESTAMP NOT NULL,
time_start TIMESTAMP NULL,
time_end TIMESTAMP NULL,
error_message TEXT NULL
)''')
cur.execute('''CREATE TABLE service (
id SERIAL PRIMARY KEY,
status TEXT NOT NULL,
error_message TEXT NULL DEFAULT NULL,
description JSON NOT NULL,
execution_id INT REFERENCES execution ON DELETE CASCADE,
service_group TEXT NOT NULL,
name TEXT NOT NULL,
backend_id TEXT NULL DEFAULT NULL,
backend_status TEXT NOT NULL DEFAULT 'undefined',
backend_host TEXT NULL DEFAULT NULL,
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
cur.execute('''CREATE TABLE port (
id SERIAL PRIMARY KEY,
service_id INT REFERENCES service ON DELETE CASCADE,
internal_name TEXT NOT NULL,
external_ip INET NULL,
external_port INT NULL,
description JSON NOT NULL
)''')
#Create oauth_client and oauth_token tables for oAuth2
cur.execute('''CREATE TABLE oauth_client (
identifier TEXT PRIMARY KEY,
secret TEXT,
role TEXT,
redirect_uris TEXT,
authorized_grants TEXT,
authorized_response_types TEXT
)''')
cur.execute('''CREATE TABLE oauth_token (
client_id TEXT PRIMARY KEY,
grant_type TEXT,
token TEXT,
data TEXT,
expires_at TIMESTAMP,
refresh_token TEXT,
refresh_token_expires_at TIMESTAMP,
scopes TEXT,
user_id TEXT
)''')
def init(force=False):
"""DB init entrypoint."""
dsn = 'dbname=' + get_conf().dbname + \
' user=' + get_conf().dbuser + \
' password=' + get_conf().dbpass + \
' host=' + get_conf().dbhost + \
' port=' + str(get_conf().dbport)
conn = psycopg2.connect(dsn)
cur = conn.cursor()
version_table(cur)
cur.execute('SET search_path TO {},public'.format(get_conf().deployment_name))
if force:
cur.execute("DELETE FROM public.versions WHERE deployment = %s", (get_conf().deployment_name,))
cur.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(get_conf().deployment_name))
if not check_schema_version(cur, get_conf().deployment_name):
create_tables(cur)
conn.commit()
cur.close()
conn.close()
return
......@@ -24,7 +24,6 @@ from tornado.web import Application
import zoe_lib.config as config
import zoe_lib.state
import zoe_api.db_init
import zoe_api.api_endpoint
import zoe_api.rest_api
import zoe_api.master_api
......@@ -57,10 +56,10 @@ def zoe_web_main() -> int:
log.error("LDAP authentication requested, but 'pyldap' module not installed.")
return 1
zoe_api.db_init.init()
sql_manager = zoe_lib.state.SQLManager(config.get_conf())
sql_manager.init_db()
master_api = zoe_api.master_api.APIManager()
sql_manager = zoe_lib.state.SQLManager(config.get_conf())
api_endpoint = zoe_api.api_endpoint.APIEndpoint(master_api, sql_manager)
app_settings = {
......
......@@ -15,7 +15,7 @@
"""Zoe state management and database classes"""
from zoe_lib.state.base import Base
from zoe_lib.state.base import BaseRecord
from zoe_lib.state.execution import Execution
from zoe_lib.state.sql_manager import SQLManager
from zoe_lib.state.service import Service, VolumeDescription, VolumeDescriptionHostPath
......
......@@ -20,7 +20,7 @@ import logging
log = logging.getLogger(__name__)
class Base:
class BaseRecord:
"""
:type sql_manager: SQLManager
"""
......@@ -34,3 +34,44 @@ class Base:
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
raise NotImplementedError
class BaseTable:
"""Common abstraction for all tables."""
def __init__(self, connection, cursor, table_name):
self.table_name = table_name
self.connection = connection
self.cursor = cursor
def create(self):
"""Create this table."""
raise NotImplementedError
def insert(self, **kwargs):
"""Create a new record."""
raise NotImplementedError
def delete(self, record_id):
"""Delete a record from this table."""
query = "DELETE FROM {} WHERE id = %s".format(self.table_name)
self.cursor.execute(query, (record_id,))
self.connection.commit()
def update(self, record_id, **kwargs):
"""Update the state of an execution."""
arg_list = []
value_list = []
for key, value in kwargs.items():
arg_list.append('{} = %s'.format(key))
value_list.append(value)
set_q = ", ".join(arg_list)
value_list.append(record_id)
q_base = 'UPDATE {} SET '.format(self.table_name) + set_q + ' WHERE id=%s'
query = self.cursor.mogrify(q_base, value_list)
self.cursor.execute(query)
self.connection.commit()
def select(self, only_one=False, limit=-1, **kwargs):
"""Select records."""
raise NotImplementedError
......@@ -20,10 +20,12 @@ import logging
import threading
import functools
from zoe_lib.state.base import BaseRecord, BaseTable
log = logging.getLogger(__name__)
class Execution:
class Execution(BaseRecord):
"""
A Zoe execution.
......@@ -42,8 +44,7 @@ class Execution:
TERMINATED_STATUS = "terminated"
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
super().__init__(d, sql_manager)
self.user_id = d['user_id']
self.name = d['name']
......@@ -95,45 +96,45 @@ class Execution:
def set_scheduled(self):
"""The execution has been added to the scheduler queues."""
self._status = self.SCHEDULED_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
self.sql_manager.executions.update(self.id, status=self._status)
def set_image_dl(self):
"""The execution has been added to the scheduler queues."""
self._status = self.IMAGE_DL_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
self.sql_manager.executions.update(self.id, status=self._status)
def set_starting(self):
"""The services of the execution are being created in Swarm."""
self._status = self.STARTING_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
self.sql_manager.executions.update(self.id, status=self._status)
def set_running(self):
"""The execution is running and producing useful work."""
self._status = self.RUNNING_STATUS
self.time_start = datetime.datetime.utcnow()
self.sql_manager.execution_update(self.id, status=self._status, time_start=self.time_start)
self.sql_manager.executions.update(self.id, status=self._status, time_start=self.time_start)
def set_cleaning_up(self):
"""The services of the execution are being terminated."""
self._status = self.CLEANING_UP_STATUS
self.sql_manager.execution_update(self.id, status=self._status)
self.sql_manager.executions.update(self.id, status=self._status)
def set_terminated(self):
"""The execution is not running."""
self._status = self.TERMINATED_STATUS
self.time_end = datetime.datetime.utcnow()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
def set_error(self):
"""The scheduler encountered an error starting or running the execution."""
self._status = self.ERROR_STATUS
self.time_end = datetime.datetime.utcnow()
self.sql_manager.execution_update(self.id, status=self._status, time_end=self.time_end)
self.sql_manager.executions.update(self.id, status=self._status, time_end=self.time_end)
def set_error_message(self, message):
"""Contains an error message in case the status is 'error'."""
self.error_message = message
self.sql_manager.execution_update(self.id, error_message=self.error_message)
self.sql_manager.executions.update(self.id, error_message=self.error_message)
@property
def is_active(self):
......@@ -156,17 +157,17 @@ class Execution:
@property
def services(self):
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
return self.sql_manager.services.select(execution_id=self.id)
@property
def essential_services(self):
"""Getter for this execution essential service list."""
return self.sql_manager.service_list(execution_id=self.id, essential=True)
return self.sql_manager.services.select(execution_id=self.id, essential=True)
@property
def elastic_services(self):
"""Getter for this execution elastic service list."""
return self.sql_manager.service_list(execution_id=self.id, essential=False)
return self.sql_manager.services.select(execution_id=self.id, essential=False)
@property
def essential_services_running(self) -> bool:
......@@ -213,3 +214,83 @@ class Execution:
def __repr__(self):
return str(self.id)
class ExecutionTable(BaseTable):
"""Abstraction for the execution table in the database."""
def __init__(self, connection, cursor):
super().__init__(connection, cursor, "execution")
def create(self):
"""Create the execution table."""
self.cursor.execute('''CREATE TABLE execution (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
user_id TEXT NOT NULL,
description JSON NOT NULL,
status TEXT NOT NULL,
execution_manager_id TEXT NULL,
time_submit TIMESTAMP NOT NULL,
time_start TIMESTAMP NULL,
time_end TIMESTAMP NULL,
error_message TEXT NULL
)''')
def insert(self, name, user_id, description):
"""Create a new execution in the state."""
status = Execution.SUBMIT_STATUS
time_submit = datetime.datetime.utcnow()
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
self.cursor.execute(query)
self.connection.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
"""
Return a list of executions.
:param only_one: only one result is expected
:type only_one: bool
:param limit: limit the result to this number of entries
:type limit: int
:param kwargs: filter executions based on their fields/columns
:return: one or more executions
"""
q_base = 'SELECT * FROM execution'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
if key == 'earlier_than_submit':
filter_list.append('"time_submit" <= to_timestamp(%s)')
elif key == 'earlier_than_start':
filter_list.append('"time_start" <= to_timestamp(%s)')
elif key == 'earlier_than_end':
filter_list.append('"time_end" <= to_timestamp(%s)')
elif key == 'later_than_submit':
filter_list.append('"time_submit" >= to_timestamp(%s)')
elif key == 'later_than_start':
filter_list.append('"time_start" >= to_timestamp(%s)')
elif key == 'later_than_end':
filter_list.append('"time_end" >= to_timestamp(%s)')
else:
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
if limit > 0:
q += ' ORDER BY id DESC LIMIT {}'.format(limit)
query = self.cursor.mogrify(q, args_list)
else:
if limit > 0:
q_base += ' ORDER BY id DESC LIMIT {}'.format(limit)
query = self.cursor.mogrify(q_base)
self.cursor.execute(query)
if only_one:
row = self.cursor.fetchone()
if row is None:
return None
return Execution(row, self)
else:
return [Execution(x, self) for x in self.cursor]
......@@ -17,32 +17,16 @@
import logging
log = logging.getLogger(__name__)
class ExposedPort:
"""A port on the container that should be exposed."""
def __init__(self, data):
self.proto = data['protocol']
self.number = data['port_number']
self.external_ip = None
self.external_port = None
from zoe_lib.state.base import BaseRecord, BaseTable
def __eq__(self, other):
if isinstance(other, ExposedPort):
return other.proto == self.proto and other.number == self.number
elif isinstance(other, str):
return other == str(self.number) + "/" + self.proto
else:
return NotImplemented
log = logging.getLogger(__name__)
class Port:
class Port(BaseRecord):
"""A tcp or udp port that should be exposed by the backend."""
def __init__(self, d, sql_manager):
self.sql_manager = sql_manager
self.id = d['id']
super().__init__(d, sql_manager)
self.internal_name = d['internal_name']
self.external_ip = d['external_ip']
......@@ -68,12 +52,73 @@ class Port:
def activate(self, ext_ip, ext_port):
"""The backend has exposed the port."""
self.sql_manager.port_update(self.id, external_ip=ext_ip, external_port=ext_port)
self.sql_manager.ports.update(self.id, external_ip=ext_ip, external_port=ext_port)
self.external_port = ext_port
self.external_ip = ext_ip
def reset(self):
"""The backend has stopped exposing the port."""
self.sql_manager.port_update(self.id, external_ip=None, external_port=None)
self.sql_manager.ports.update(self.id, external_ip=None, external_port=None)
self.external_port = None
self.external_ip = None
class PortTable(BaseTable):
"""Abstraction for the port table in the database."""
def __init__(self, connection, cursor):
super().__init__(connection, cursor, "port")
def create(self):
"""Create the Port table."""
self.cursor.execute('''CREATE TABLE port (
id SERIAL PRIMARY KEY,
service_id INT REFERENCES service ON DELETE CASCADE,
internal_name TEXT NOT NULL,
external_ip INET NULL,
external_port INT NULL,
description JSON NOT NULL
)''')
def insert(self, service_id, internal_name, description):
"""Adds a new port to the state."""
query = self.cursor.mogrify('INSERT INTO port (id, service_id, internal_name, external_ip, external_port, description) VALUES (DEFAULT, %s, %s, NULL, NULL, %s) RETURNING id', (service_id, internal_name, description))
self.cursor.execute(query)
self.connection.commit()
return self.cursor.fetchone()[0]
def select(self, only_one=False, limit=-1, **kwargs):
"""
Return a list of ports.
:param only_one: only one result is expected
:type only_one: bool
:param limit: limit the result to this number of entries
:type limit: int
:param kwargs: filter services based on their fields/columns
:return: one or more ports
"""
q_base = 'SELECT * FROM port'
if len(kwargs) > 0:
q = q_base + " WHERE "
filter_list = []
args_list = []
for key, value in kwargs.items():
filter_list.append('{} = %s'.format(key))
args_list.append(value)
q += ' AND '.join(filter_list)
if limit > 0:
q += ' ORDER BY id DESC LIMIT {}'.format(limit)
query = self.cursor.mogrify(q, args_list)
else:
if limit > 0:
q_base += ' ORDER BY id DESC LIMIT {}'.format(limit)
query = self.cursor.mogrify(q_base)
self.cursor.execute(query)
if only_one:
row = self.cursor.fetchone()
if row is None:
return None
return Port(row, self)
else:
return [Port(x, self) for x in self.cursor]
This diff is collapsed.
This diff is collapsed.
......@@ -18,3 +18,4 @@
ZOE_VERSION = '2017.12-beta'
ZOE_API_VERSION = '0.7'
ZOE_APPLICATION_FORMAT_VERSION = 3
SQL_SCHEMA_VERSION = 5 # ---> Increment this value every time the SQL schema changes !!! <---
......@@ -52,7 +52,7 @@ class BaseBackend:
"""Terminate the container corresponding to a service."""
raise NotImplementedError
def platform_state(self, usage_stats=False) -> ClusterStats:
def platform_state(self) -> ClusterStats:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
......
......@@ -17,7 +17,6 @@
import logging
import re
import threading
import time
from zoe_lib.config import get_conf
......@@ -33,7 +32,7 @@ from zoe_master.stats import ClusterStats, NodeStats
log = logging.getLogger(__name__)
# These two module-level variables hold the references to the monitor and checker threads
# This module-level variable holds the references to the synchro threads
_checker = None
......@@ -42,7 +41,6 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
def __init__(self, opts):
super().__init__(opts)
self.docker_config = DockerConfig(get_conf().backend_docker_config_file).read_config()
self.cached_stats = None
def _get_config(self, host) -> DockerHostConfig:
for conf in self.docker_config:
......@@ -86,37 +84,15 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.error('Cannot terminate service {}, since it has not backend ID'.format(service.name))
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def platform_state(self, usage_stats=False) -> ClusterStats:
def platform_state(self) -> ClusterStats:
"""Get the platform state."""
time_start = time.time()
platform_stats = ClusterStats()
th_list = []
for host_conf in self.docker_config: # type: DockerHostConfig
node_stats = NodeStats(host_conf.name)
th = threading.Thread(target=self._update_node_state, args=(host_conf, node_stats, usage_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th.start()
th_list.append((th, node_stats))
for th, node_stats in th_list:
th.join()
node_stats = _checker.host_stats[host_conf.name]
platform_stats.nodes.append(node_stats)
log.debug('Time for platform stats: {:.2f}s'.format(time.time() - time_start))