Commit e5eef924 authored by Daniele Venzano's avatar Daniele Venzano

Add a field in the state to distinguish essential services

parent 767c8bd5
......@@ -144,10 +144,10 @@ class APIEndpoint:
log.debug('Starting dead execution cleanup task')
all_execs = self.sql.execution_list()
for execution in all_execs:
if execution.status == execution.RUNNING_STATUS:
if execution.is_running():
for service in execution.services:
if service.description['monitor'] and service.backend_status == service.BACKEND_DIE_STATUS or service.backend_status == service.BACKEND_DESTROY_STATUS:
log.info("Service {} of execution {} died, terminating execution".format(service.name, execution.id))
if service.description['monitor'] and service.is_dead():
log.info("Service {} ({}) of execution {} died, terminating execution".format(service.id, service.name, execution.id))
self.master.execution_terminate(execution.id)
break
log.debug('Cleanup task finished')
......@@ -75,7 +75,8 @@ def create_tables(cur):
name TEXT NOT NULL,
backend_id TEXT NULL DEFAULT NULL,
backend_status TEXT NOT NULL DEFAULT 'undefined',
ip_address CIDR NULL DEFAULT NULL
ip_address CIDR NULL DEFAULT NULL,
essential BOOLEAN NOT NULL DEFAULT FALSE
)''')
......
......@@ -127,6 +127,10 @@ class Execution:
"""
return self._status == self.SCHEDULED_STATUS or self._status == self.RUNNING_STATUS or self._status == self.STARTING_STATUS or self._status == self.CLEANING_UP_STATUS
def is_running(self):
"""Returns True is the execution has at least the essential services running."""
return self._status == self.RUNNING_STATUS
@property
def status(self):
"""Getter for the execution status."""
......@@ -136,3 +140,12 @@ class Execution:
def services(self):
"""Getter for this execution service list."""
return self.sql_manager.service_list(execution_id=self.id)
@property
def essentials_started(self) -> bool:
"""Returns True if all essential services of this execution have started."""
ret = True
for service in self.services:
if service.essential:
ret = ret and not service.is_dead()
return ret
......@@ -18,7 +18,6 @@
import logging
from zoe_lib.config import get_conf
from zoe_master.backends.old_swarm.api_client import SwarmClient
log = logging.getLogger(__name__)
......@@ -80,6 +79,8 @@ class Service:
if self.ip_address is not None and ('/32' in self.ip_address or '/128' in self.ip_address):
self.ip_address = self.ip_address.split('/')[0]
self.essential = d['essential']
# Fields parsed from the JSON description
self.image_name = self.description['docker_image']
self.is_monitor = self.description['monitor']
......@@ -108,7 +109,8 @@ class Service:
'service_group': self.service_group,
'backend_id': self.backend_id,
'ip_address': self.ip_address,
'backend_status': self.backend_status
'backend_status': self.backend_status,
'essential': self.essential
}
def __eq__(self, other):
......@@ -159,3 +161,7 @@ class Service:
"""Getter for the user_id, that is actually taken form the parent execution."""
execution = self.sql_manager.execution_list(only_one=True, id=self.execution_id)
return execution.user_id
def is_dead(self):
"""Returns True if this service is not running."""
return self.backend_status == self.BACKEND_DESTROY_STATUS or self.backend_status == self.BACKEND_OOM_STATUS or self.backend_status == self.BACKEND_DIE_STATUS
......@@ -172,11 +172,11 @@ class SQLManager:
cur.execute(query)
self.conn.commit()
def service_new(self, execution_id, name, service_group, description):
def service_new(self, execution_id, name, service_group, description, is_essential):
"""Adds a new service to the state."""
cur = self._cursor()
status = 'created'
query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description))
query = cur.mogrify('INSERT INTO service (id, status, error_message, execution_id, name, service_group, description, essential) VALUES (DEFAULT, %s,NULL,%s,%s,%s,%s,%s) RETURNING id', (status, execution_id, name, service_group, description, is_essential))
cur.execute(query)
self.conn.commit()
return cur.fetchone()[0]
......@@ -144,40 +144,39 @@ class SwarmClient:
pl_status.memory_total = info["MemTotal"]
pl_status.cores_total = info["NCPU"]
# DriverStatus is a list...
idx = 1
assert 'Strategy' in info["DriverStatus"][idx][0]
pl_status.placement_strategy = info["DriverStatus"][idx][1]
idx = 2
assert 'Filters' in info["DriverStatus"][idx][0]
pl_status.active_filters = [x.strip() for x in info["DriverStatus"][idx][1].split(", ")]
idx = 3
assert 'Nodes' in info["DriverStatus"][idx][0]
node_count = int(info["DriverStatus"][idx][1])
idx = 4
# SystemStatus is a list...
idx = 0 # Role, skip
idx += 1
assert 'Strategy' in info["SystemStatus"][idx][0]
pl_status.placement_strategy = info["SystemStatus"][idx][1]
idx += 1
assert 'Filters' in info["SystemStatus"][idx][0]
pl_status.active_filters = [x.strip() for x in info["SystemStatus"][idx][1].split(", ")]
idx += 1
assert 'Nodes' in info["SystemStatus"][idx][0]
node_count = int(info["SystemStatus"][idx][1])
idx += 1 # At index 4 the nodes begin
for node in range(node_count):
idx2 = 0
node_stats = NodeStats(info["DriverStatus"][idx + node][0])
node_stats.docker_endpoint = info["DriverStatus"][idx + node][1]
idx2 += 1
if 'Status' in info["DriverStatus"][idx + node + idx2][0]: # new docker version
node_stats.status = info["DriverStatus"][idx + node + idx2][1]
idx2 += 1
node_stats.container_count = int(info["DriverStatus"][idx + node + idx2][1])
idx2 += 1
node_stats.cores_reserved = int(info["DriverStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["DriverStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1
node_stats.memory_reserved = info["DriverStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["DriverStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1
node_stats.labels = info["DriverStatus"][idx + node + idx2][1:]
idx2 += 1
node_stats.error = info["DriverStatus"][idx + node + idx2][1]
idx2 += 1
node_stats.last_update = info["DriverStatus"][idx + node + idx2][1]
idx2 += 1
node_stats.server_version = info["DriverStatus"][idx + node + idx2][1]
node_stats = NodeStats(info["SystemStatus"][idx + node][0].strip())
node_stats.docker_endpoint = info["SystemStatus"][idx + node][1]
idx2 += 1 # ID, skip
idx2 += 1 # Status
node_stats.status = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Containers
node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
idx2 += 1 # CPUs
node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1 # Memory
node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1 # Labels
node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
idx2 += 1 # Last update
node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Docker version
node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]
node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)
......
......@@ -73,6 +73,8 @@ class SwarmMonitor(threading.Thread):
service_id = event['Actor']['Attributes']['zoe.service.id'] # type: int
service = self.state.service_list(only_one=True, id=service_id)
if service is None:
return
if 'exec' in event['Action']:
pass
elif 'create' in event['Action']:
......
......@@ -25,9 +25,19 @@ log = logging.getLogger(__name__)
def _digest_application_description(state: SQLManager, execution: Execution):
for service_descr in execution.description['services']:
for counter in range(service_descr['total_count']):
essential_count = service_descr['essential_count']
total_count = service_descr['total_count']
elastic_count = total_count - essential_count
counter = 0
for i in range(essential_count):
name = "{}{}".format(service_descr['name'], counter)
state.service_new(execution.id, name, service_descr['name'], service_descr)
state.service_new(execution.id, name, service_descr['name'], service_descr, True)
counter += 1
for i in range(elastic_count):
name = "{}{}".format(service_descr['name'], counter)
state.service_new(execution.id, name, service_descr['name'], service_descr, False)
counter += 1
assert counter == total_count
def execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment