Commit 08032d1d authored by Daniele Venzano's avatar Daniele Venzano

Implement monitor thread

Uses the Docker event stream to keep service status in sync
parent 4c3a202b
......@@ -47,7 +47,7 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=line-too-long,logging-format-interpolation,too-few-public-methods,too-many-instance-attributes,fixme,too-many-branches,file-ignored,global-statement,redefined-variable-type,no-self-use,too-many-statements,locally-disabled,arguments-differ
disable=line-too-long,logging-format-interpolation,too-few-public-methods,too-many-instance-attributes,fixme,too-many-branches,file-ignored,global-statement,redefined-variable-type,no-self-use,too-many-statements,locally-disabled,arguments-differ,unnecessary-lambda
[REPORTS]
......
......@@ -21,7 +21,7 @@ import psycopg2.extras
import zoe_api.exceptions
from zoe_lib.config import get_conf
SQL_SCHEMA_VERSION = 0 # ---> Increment this value every time the schema changes !!! <---
SQL_SCHEMA_VERSION = 1 # ---> Increment this value every time the schema changes !!! <---
def version_table(cur):
......@@ -72,8 +72,9 @@ def create_tables(cur):
description JSON NOT NULL,
execution_id INT REFERENCES execution,
service_group TEXT NOT NULL,
name TEXT NOT NULL ,
docker_id TEXT NULL DEFAULT NULL
name TEXT NOT NULL,
docker_id TEXT NULL DEFAULT NULL,
docker_status TEXT NOT NULL DEFAULT 'undefined'
)''')
......
......@@ -16,6 +16,7 @@
"""Interface to PostgresQL for Zoe state."""
import datetime
import logging
import psycopg2
import psycopg2.extras
......@@ -23,6 +24,8 @@ import psycopg2.extras
from zoe_lib.config import get_conf
from zoe_lib.swarm_client import SwarmClient
log = logging.getLogger(__name__)
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
......@@ -315,6 +318,12 @@ class Service(Base):
ACTIVE_STATUS = "active"
STARTING_STATUS = "starting"
DOCKER_UNDEFINED_STATUS = 'undefined'
DOCKER_CREATE_STATUS = 'create'
DOCKER_START_STATUS = 'start'
DOCKER_DIE_STATUS = 'die'
DOCKER_DESTROY_STATUS = 'destroy'
def __init__(self, d, sql_manager):
super().__init__(d, sql_manager)
......@@ -325,6 +334,7 @@ class Service(Base):
self.description = d['description']
self.service_group = d['service_group']
self.docker_id = d['docker_id']
self.docker_status = d['docker_status']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
......@@ -337,7 +347,8 @@ class Service(Base):
'description': self.description,
'service_group': self.service_group,
'docker_id': self.docker_id,
'ip_address': self.ip_address
'ip_address': self.ip_address,
'docker_status': self.docker_status
}
def __eq__(self, other):
......@@ -364,6 +375,22 @@ class Service(Base):
"""The service is running and has a valid docker_id."""
self.sql_manager.service_update(self.id, status=self.ACTIVE_STATUS, docker_id=docker_id)
def set_docker_status(self, new_status):
"""Docker has emitted an event related to this service."""
if new_status == 'create':
new_status = self.DOCKER_CREATE_STATUS
elif new_status == 'start':
new_status = self.DOCKER_START_STATUS
elif new_status == 'die':
new_status = self.DOCKER_DIE_STATUS
elif new_status == 'destroy':
new_status = self.DOCKER_DESTROY_STATUS
else:
log.error('Unknown docker status: {}'.format(new_status))
return
self.sql_manager.service_update(self.id, docker_status=new_status)
@property
def ip_address(self):
"""Getter for the service IP address, queries Swarm as the IP address changes outside our control."""
......
......@@ -31,6 +31,8 @@ import docker
import docker.errors
import docker.utils
import requests.packages
from zoe_master.stats import SwarmStats, SwarmNodeStats
from zoe_lib.exceptions import ZoeLibException
......@@ -301,8 +303,22 @@ class SwarmClient:
def event_listener(self, callback: Callable[[str], bool]) -> None:
"""An infinite loop that listens for events from Swarm."""
for event in self.cli.events(decode=True):
if not callback(event):
event_gen = self.cli.events(decode=True)
while True:
try:
event = next(event_gen)
except requests.packages.urllib3.exceptions.ProtocolError:
log.warning('Docker closed event connection, retrying...')
event_gen = self.cli.events(decode=True)
continue
try:
res = callback(event)
except Exception:
log.exception('Uncaught exception in swarm event callback')
log.warning('event was: {}'.format(event))
continue
if not res:
break
def connect_to_network(self, container_id: str, network_id: str) -> None:
......
......@@ -22,6 +22,7 @@ import logging
from zoe_master.master_api import APIManager
from zoe_master.scheduler import ZoeScheduler
from zoe_master.execution_manager import restart_resubmit_scheduler
from zoe_master.monitor import ZoeMonitor
import zoe_lib.config as config
from zoe_lib.metrics.influxdb import InfluxDBMetricSender
......@@ -56,6 +57,8 @@ def main():
log.info("Initializing scheduler")
scheduler = ZoeScheduler()
monitor = ZoeMonitor(state)
restart_resubmit_scheduler(state, scheduler)
log.info("Starting ZMQ API server...")
......@@ -69,5 +72,6 @@ def main():
log.exception('fatal error')
finally:
scheduler.quit()
monitor.quit()
api_server.quit()
metrics.quit()
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Monitor for the Swarm event stream."""
import logging
import threading
from zoe_lib.swarm_client import SwarmClient
from zoe_lib.config import get_conf
from zoe_lib.sql_manager import SQLManager
log = logging.getLogger(__name__)
class ZoeMonitor(threading.Thread):
"""The monitor."""
def __init__(self, state: SQLManager):
super().__init__()
self.setName('monitor')
self.stop = False
self.state = state
self.setDaemon(True)
self.start()
def run(self):
"""The thread loop."""
swarm = SwarmClient(get_conf())
swarm.event_listener(lambda x: self._event_cb(x))
def _event_cb(self, event: dict) -> bool:
if event['Type'] == 'container':
self._container_event(event)
elif event['Type'] == 'network':
pass
elif event['Type'] == 'image':
pass
else:
log.debug('Unmanaged event type: {}'.format(event['Type']))
log.debug(event)
if self.stop:
return False
else:
return True
def _container_event(self, event: dict):
if 'zoe.deployment_name' not in event['Actor']['Attributes']:
return
if event['Actor']['Attributes']['zoe.deployment_name'] != get_conf().deployment_name:
return
service_id = event['Actor']['Attributes']['zoe.service.id'] # type: int
service = self.state.service_list(only_one=True, id=service_id)
if 'exec' in event['Action']:
pass
elif 'create' in event['Action']:
service.set_docker_status('create')
elif 'start' in event['Action']:
service.set_docker_status('start')
elif 'die' in event['Action']:
service.set_docker_status('die')
elif 'destroy' in event['Action']:
service.set_docker_status('destroy')
else:
log.debug('Unmanaged container action: {}'.format(event['Action']))
def quit(self):
"""Stops the thread."""
self.stop = True
SAMPLE_EVENT = {
'node': {
'Name': 'bf18',
'Id': 'VPCL:E5GW:WON3:2DPV:WFO7:EVNO:ZAKS:V2PA:PGKU:RSM7:AAR3:EAV7',
'Addr': '192.168.47.18:2375',
'Ip': '192.168.47.18'
},
'timeNano': 1469622892143470822,
'Actor': {
'ID': 'e4d3e639c1ec2107262f19cf6e57406cf83e376ef4f131461c3f256d0ce64e13',
'Attributes': {
'node.ip': '192.168.47.18',
'image': 'docker-registry:5000/zoerepo/spark-submit',
'node.name': 'bf18',
'node.addr': '192.168.47.18:2375',
'zoe.service.name': 'spark-submit0',
'name': 'spark-submit0-60-prod',
'zoe.owner': 'milanesio',
'zoe.deployment_name': 'prod',
'com.docker.swarm.id': 'de7515d8839c461523e8326c552b45da0f9bd0f9af4f68d4d5a55429533405d4',
'zoe.execution.id': '60',
'zoe.monitor': 'true',
'zoe.execution.name': 'testebob',
'node.id': 'VPCL:E5GW:WON3:2DPV:WFO7:EVNO:ZAKS:V2PA:PGKU:RSM7:AAR3:EAV7',
'zoe.service.id': '233',
'zoe.type': 'app_service'
}
},
'status': 'start',
'Action': 'start',
'id': 'e4d3e639c1ec2107262f19cf6e57406cf83e376ef4f131461c3f256d0ce64e13',
'time': 1469622892,
'Type': 'container',
'from': 'docker-registry:5000/zoerepo/spark-submit node:bf18'
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment