Commit b6151ff6 authored by qhoangxuan's avatar qhoangxuan

add ingress label, fix event stream

parent 4cb22ae3
......@@ -27,15 +27,16 @@ from zoe_master.stats import ClusterStats, NodeStats
from zoe_master.backends.service_instance import ServiceInstance
from zoe_lib.version import ZOE_VERSION
from zoe_lib.state import VolumeDescription
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
ZOE_LABELS = {
"app": "zoe",
"version": ZOE_VERSION
"version": ZOE_VERSION,
"auto-ingress/enabled" : "enabled"
}
class KubernetesConf:
"""Kubeconfig class"""
def __init__(self, jsonfile):
......@@ -51,7 +52,8 @@ class KubernetesServiceConf:
'kind': 'Service',
'apiVersion': "v1",
'metadata': {
'labels': {}
'labels': {},
'namespace': get_conf().kube_namespace
},
'spec': {
'selector': {},
......@@ -97,7 +99,8 @@ class KubernetesReplicationControllerConf:
'kind': 'ReplicationController',
'apiVersion': "v1",
'metadata': {
'labels': {}
'labels': {},
'namespace': get_conf().kube_namespace
},
'spec': {
'replicas': 1,
......@@ -265,7 +268,7 @@ class KubernetesClient:
def inspect_replication_controller(self, name):
"""Get information about a specific replication controller."""
try:
repcon_list = pykube.ReplicationController.objects(self.api)
repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace)
rep = repcon_list.get_by_name(name)
rc_info = rep.obj
......@@ -304,7 +307,7 @@ class KubernetesClient:
def replication_controller_list(self):
"""Get list of replication controller."""
repcon_list = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).iterator()
repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=ZOE_LABELS).iterator()
rclist = []
try:
for rep in repcon_list:
......@@ -313,11 +316,6 @@ class KubernetesClient:
log.error(ex)
return rclist
def replication_controller_event(self):
"""Get event stream of the replication controller."""
rc_stream = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).watch()
return rc_stream
def spawn_service(self, service_instance: ServiceInstance):
"""Create and start a new Service object."""
config = KubernetesServiceConf()
......@@ -342,7 +340,7 @@ class KubernetesClient:
def inspect_service(self, name) -> Dict[str, Any]:
"""Get information of a specific service."""
try:
service_list = pykube.Service.objects(self.api)
service_list = pykube.Service.objects(self.api).filter(namespace=get_conf().kube_namespace)
service = service_list.get_by_name(name)
srv_info = service.obj
......@@ -374,7 +372,8 @@ class KubernetesClient:
'apiVersion': 'v1',
'kind': '',
'metadata': {
'name': name
'name': name,
'namespace': get_conf().kube_namespace
}
}
try:
......@@ -387,7 +386,7 @@ class KubernetesClient:
del_obj['kind'] = 'Pod'
pod_selector = ZOE_LABELS
pod_selector['service_name'] = name
pods = pykube.Pod.objects(self.api).filter(namespace="default", selector=pod_selector).iterator()
pods = pykube.Pod.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=pod_selector).iterator()
for pod in pods:
del_obj['metadata']['name'] = str(pod)
pykube.Pod(self.api, del_obj).delete()
......@@ -400,7 +399,7 @@ class KubernetesClient:
"""Retrieve Kubernetes cluster statistics."""
pl_status = ClusterStats()
node_list = pykube.Node.objects(self.api).iterator()
node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
node_dict = {}
# Get basic information from nodes
......
......@@ -18,6 +18,7 @@
import logging
import threading
import time
import pykube
from zoe_lib.config import get_conf
from zoe_lib.state import SQLManager, Service
......@@ -43,38 +44,43 @@ class KubernetesMonitor(threading.Thread):
"""An infinite loop that listens for events from Kubernetes."""
log.info("Monitor thread started")
while True: # pylint: disable=too-many-nested-blocks
for event in self.kube.replication_controller_event():
log.debug('%s: %s', event.object.name, event.type)
if event.type != 'DELETED' and event.type != 'ADDED':
rc_info = self.kube.inspect_replication_controller(event.object.name)
if rc_info:
rc_uid = rc_info['backend_id']
service = self.state.service_list(only_one=True, backend_id=rc_uid)
if event.object.name not in self.service_id:
self.service_id[event.object.name] = service.id
if service is not None:
if rc_info['readyReplicas'] == 0:
log.debug('Number replicas: 0')
service.set_backend_status(service.BACKEND_UNDEFINED_STATUS)
elif rc_info['readyReplicas'] < rc_info['replicas']:
logstr = 'Number replicas: ' + str(rc_info['readyReplicas'])
log.debug(logstr)
service.set_backend_status(service.BACKEND_CREATE_STATUS)
elif rc_info['readyReplicas'] == rc_info['replicas']:
if service.backend_status != service.BACKEND_START_STATUS:
log.debug('Reached desired number of replicas')
service.set_backend_status(service.BACKEND_START_STATUS)
else:
if event.type != 'ADDED':
if event.object.name in self.service_id:
sid = self.service_id[event.object.name]
self.service_id.pop(event.object.name)
service = self.state.service_list(only_one=True, id=sid)
log.debug("Kubernetes service event stream")
try:
watch = pykube.ReplicationController.objects(self.kube.api, namespace=get_conf().kube_namespace).watch()
for event in watch:
log.debug('%s: %s', event.object.name, event.type)
if event.type != 'DELETED' and event.type != 'ADDED':
rc_info = self.kube.inspect_replication_controller(event.object.name)
if rc_info:
rc_uid = rc_info['backend_id']
service = self.state.service_list(only_one=True, backend_id=rc_uid)
if event.object.name not in self.service_id:
self.service_id[event.object.name] = service.id
if service is not None:
log.info('Destroyed all replicas')
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
time.sleep(1)
if rc_info['readyReplicas'] == 0:
log.debug('Number replicas: 0')
service.set_backend_status(service.BACKEND_UNDEFINED_STATUS)
elif rc_info['readyReplicas'] < rc_info['replicas']:
logstr = 'Number replicas: ' + str(rc_info['readyReplicas'])
log.debug(logstr)
service.set_backend_status(service.BACKEND_CREATE_STATUS)
elif rc_info['readyReplicas'] == rc_info['replicas']:
if service.backend_status != service.BACKEND_START_STATUS:
log.debug('Reached desired number of replicas')
service.set_backend_status(service.BACKEND_START_STATUS)
else:
if event.type != 'ADDED':
if event.object.name in self.service_id:
sid = self.service_id[event.object.name]
self.service_id.pop(event.object.name)
service = self.state.service_list(only_one=True, id=sid)
if service is not None:
log.info('Destroyed all replicas')
service.set_backend_status(service.BACKEND_DESTROY_STATUS)
time.sleep(1)
except Exception as ex:
log.error(ex)
log.debug("Kubernetes service event stream ended, start new stream")
time.sleep(2)
def quit(self):
......@@ -101,6 +107,7 @@ class KubernetesStateSynchronizer(threading.Thread):
"""Loop through the pods and try to update the service status."""
found = False
for rep in repcon_list:
log.debug("%s - %s", rep['backend_id'], service.backend_id)
if rep['backend_id'] == service.backend_id:
found = True
if rep['running'] is False:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment