Commit d6c04c0d authored by Daniele Venzano's avatar Daniele Venzano

Remove event monitor thread, do as kubernetes does

parent 2dd3b62c
......@@ -123,7 +123,7 @@ class SwarmStateSynchronizer(threading.Thread):
swarm = SwarmClient(get_conf())
while not self.stop:
service_list = self.state.service_list()
container_list = swarm.list(only_label={'zoe.deployment_name': get_conf().deployment_name})
container_list = swarm.list(only_label={'zoe_deployment_name': get_conf().deployment_name})
for service in service_list:
assert isinstance(service, Service)
......
......@@ -50,6 +50,7 @@ from zoe_lib.config import get_conf
from zoe_lib.exceptions import ZoeLibException, ZoeNotEnoughResourcesException
from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
from zoe_lib.state import Service
log = logging.getLogger(__name__)
......@@ -100,7 +101,6 @@ class SwarmClient:
manager = url
else:
raise ZoeLibException('Unsupported URL scheme for Swarm')
log.debug('Connecting to Swarm at {}'.format(manager))
self.cli = docker.DockerClient(base_url=manager, version="auto")
def info(self) -> ClusterStats:
......@@ -233,23 +233,26 @@ class SwarmClient:
info["ip_address"][net] = None
if container.status == 'running':
info["state"] = "running"
info["state"] = Service.BACKEND_START_STATUS
info["running"] = True
elif container.status == "paused":
info["state"] = "paused"
elif container.status == 'paused':
info["state"] = Service.BACKEND_DIE_STATUS
info["running"] = False
elif container.status == 'restarting':
info["state"] = "restarting"
info["state"] = Service.BACKEND_START_STATUS
info["running"] = True
elif container.status == 'OOMKilled' or container.status == 'exited':
info["state"] = "killed"
elif container.status == 'OOMKilled':
info["state"] = Service.BACKEND_OOM_STATUS
info["running"] = False
elif container.status == 'exited':
info["state"] = Service.BACKEND_DIE_STATUS
info["running"] = False
elif container.status == 'created':
info["state"] = 'created'
info["state"] = Service.BACKEND_CREATE_STATUS
info["running"] = False
else:
log.error('Unknown container status: {}'.format(container.status))
info["state"] = "unknown"
info["state"] = Service.BACKEND_UNDEFINED_STATUS
info["running"] = False
info['ports'] = {}
......@@ -264,7 +267,6 @@ class SwarmClient:
else:
info['ports'][port] = None
return info
def inspect_container(self, docker_id: str) -> Dict[str, Any]:
......
......@@ -23,7 +23,7 @@ from zoe_master.backends.old_swarm_new_api.api_client import SwarmClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException
import zoe_master.backends.base
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.backends.old_swarm_new_api.threads import SwarmMonitor, SwarmStateSynchronizer
from zoe_master.backends.old_swarm_new_api.threads import SwarmStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__)
......@@ -37,19 +37,17 @@ class OldSwarmNewAPIBackend(zoe_master.backends.base.BaseBackend):
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
def __init__(self, opts):
super().__init__(opts)
self.swarm = SwarmClient(opts)
self.swarm = SwarmClient()
@classmethod
def init(cls, state):
"""Initializes Swarm backend starting the event monitoring thread."""
global _monitor, _checker
_monitor = SwarmMonitor(state)
_checker = SwarmStateSynchronizer(state)
@classmethod
def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend."""
_monitor.quit()
_checker.quit()
def spawn_service(self, service_instance: ServiceInstance):
......
......@@ -25,74 +25,7 @@ from zoe_master.backends.old_swarm_new_api.api_client import SwarmClient
log = logging.getLogger(__name__)
class SwarmMonitor(threading.Thread):
"""The monitor."""
def __init__(self, state: SQLManager) -> None:
super().__init__()
self.setName('monitor')
self.stop = False
self.state = state
self.setDaemon(True)
self.start()
def run(self):
"""The thread loop."""
log.info("Monitor thread started")
swarm = SwarmClient(get_conf())
while True:
try:
swarm.event_listener(lambda x: self._event_cb(x))
except Exception:
log.exception('Exception in monitor thread')
time.sleep(1) # wait a bit before retrying the connection
def _event_cb(self, event: dict) -> bool:
if event['Type'] == 'container':
self._container_event(event)
elif event['Type'] == 'volume' or event['Type'] == 'network' or event['Type'] == 'swarm' or event['Type'] == 'image':
pass
else:
log.debug('Unmanaged event type: {}'.format(event['Type']))
log.debug(str(event))
if self.stop:
return False
else:
return True
def _container_event(self, event: dict):
if 'zoe.deployment_name' not in event['Actor']['Attributes']:
return
if event['Actor']['Attributes']['zoe.deployment_name'] != get_conf().deployment_name:
return
service_id = event['Actor']['Attributes']['zoe.service.id'] # type: int
service = self.state.service_list(only_one=True, id=service_id)
if service is None:
return
if 'create' in event['Action']:
service.set_backend_status(service.BACKEND_CREATE_STATUS)
elif 'start' in event['Action']:
service.set_backend_status(service.BACKEND_START_STATUS)
elif 'die' in event['Action'] or 'kill' in event['Action'] or 'stop' in event['Action']:
service.set_backend_status(service.BACKEND_DIE_STATUS)
elif 'oom' in event['Action']:
service.set_backend_status(service.BACKEND_OOM_STATUS)
log.warning('Service {} got killed by an OOM condition'.format(service.id))
elif 'destroy' in event['Action']:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
else:
log.debug('Unmanaged container action: {}'.format(event['Action']))
def quit(self):
"""Stops the thread."""
self.stop = True
CHECK_INTERVAL = 300
CHECK_INTERVAL = 10
class SwarmStateSynchronizer(threading.Thread):
......@@ -107,31 +40,43 @@ class SwarmStateSynchronizer(threading.Thread):
self.start()
def _find_dead_service(self, container_list, service: Service):
"""Loop through the containers and try to update the service status."""
found = False
for container in container_list:
if container['id'] == service.backend_id:
found = True
if container['status'] == 'exited':
log.info('resetting status of service {}, died with no event'.format(service.name))
service.set_backend_status(service.BACKEND_DIE_STATUS)
if not found:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
def _update_service_status(self, service: Service, container):
"""Update the service status."""
if service.backend_status != container['state']:
old_status = service.backend_status
service.set_backend_status(container['state'])
log.debug('Updated service status, {} from {} to {}'.format(service.name, old_status, container['state']))
def run(self):
"""The thread loop."""
log.info("Checker thread started")
swarm = SwarmClient(get_conf())
while not self.stop:
swarm = SwarmClient()
service_list = self.state.service_list()
container_list = swarm.list(only_label={'zoe.deployment_name': get_conf().deployment_name})
containers = {}
for cont in container_list:
containers[cont['id']] = cont
services = {}
for serv in service_list:
services[serv.backend_id] = serv
for service in service_list:
assert isinstance(service, Service)
if service.backend_status == service.BACKEND_DESTROY_STATUS or service.backend_status == service.BACKEND_DIE_STATUS:
continue
self._find_dead_service(container_list, service)
if service.backend_id in containers:
self._update_service_status(service, containers[service.backend_id])
else:
if service.backend_status == service.BACKEND_DESTROY_STATUS:
continue
else:
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
for container in container_list:
if container['id'] not in services:
log.warning('Found an unknown Zoe container, killing it: {} {}'.format(container['name'], container['labels']))
swarm.terminate_container(container['id'], True)
time.sleep(CHECK_INTERVAL)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment