Commit 82977f7f authored by Daniele Venzano's avatar Daniele Venzano

Several fixes after testing the Kubernetes back-end with minik8s v.1.12

parent 2be228e3
...@@ -62,3 +62,4 @@ state.zoe ...@@ -62,3 +62,4 @@ state.zoe
/zoe*.conf /zoe*.conf
zoepass.csv zoepass.csv
/docker.conf /docker.conf
/kube.conf
...@@ -32,10 +32,22 @@ How it works ...@@ -32,10 +32,22 @@ How it works
* Create each **replication controller** per each service of a Zapp. Replication Controller assures to have at least a number of **replicas** (pod) always running. * Create each **replication controller** per each service of a Zapp. Replication Controller assures to have at least a number of **replicas** (pod) always running.
* Create a Kubernetes **service** per each **replication controller**, which has the same **labels** and **label selectors** with the associated **replication controller**. The service would help the zapp service be exposed to the network by exposing the same port of the service on all kubernetes nodes. * Create a Kubernetes **service** per each **replication controller**, which has the same **labels** and **label selectors** with the associated **replication controller**. The service would help the zapp service be exposed to the network by exposing the same port of the service on all kubernetes nodes.
Additional notes
----------------
The host running Zoe must be able to translate via DNS/hosts K8S node names and IPs.
Many of the features of the Zoe scheduler have not been implemented for the Kubernetes back-end.
Kubernetes network configuration can be done in many different ways. The Zoe Kubernetes back-end implements a very simple algorithm for setting and retrieving port forwading information to show on the web interface. You will have to customize this implementation to support your own Kubernetes deployment, with the network and port forwarding back-end of your choice.
This version of Zoe has been tested with microk8s v.1.12
References References
---------- ----------
* Kubernetes: https://kubernetes.io/ * Kubernetes: https://kubernetes.io/
* Kubernetes Replication Controller : https://kubernetes.io/docs/user-guide/replication-controller/ * Kubernetes Replication Controller : https://kubernetes.io/docs/user-guide/replication-controller/
* Kubernetes Service: https://kubernetes.io/docs/user-guide/services/ * Kubernetes Service: https://kubernetes.io/docs/user-guide/services/
* Kubernetes Limit and Request: https://kubernetes.io/docs/user-guide/compute-resources/ * Kubernetes Limit and Request: https://kubernetes.io/docs/user-guide/compute-resources/
* Microk8s: https://microk8s.io/
...@@ -243,16 +243,17 @@ class KubernetesClient: ...@@ -243,16 +243,17 @@ class KubernetesClient:
config.set_spec_container_name(service_instance.name) config.set_spec_container_name(service_instance.name)
if len(service_instance.environment) > 0: if len(service_instance.environment) > 0:
config.set_spec_container_env(service_instance.environment) envs = {e[0]: str(e[1]) for e in service_instance.environment}
config.set_spec_container_env(envs)
if len(service_instance.ports) > 0: if len(service_instance.ports) > 0:
config.set_spec_container_ports(service_instance.ports) config.set_spec_container_ports(service_instance.ports)
if service_instance.memory_limit is not None: if service_instance.memory_limit is not None:
config.set_spec_container_mem_limit(service_instance.memory_limit.max) config.set_spec_container_mem_limit(service_instance.memory_limit.min)
if service_instance.core_limit is not None: if service_instance.core_limit is not None:
config.set_spec_container_core_limit(service_instance.core_limit.max) config.set_spec_container_core_limit(service_instance.core_limit.min)
if len(service_instance.volumes) > 0: if len(service_instance.volumes) > 0:
config.set_spec_container_volumes(service_instance.volumes, service_instance.name) config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
...@@ -267,7 +268,7 @@ class KubernetesClient: ...@@ -267,7 +268,7 @@ class KubernetesClient:
log.info('Created ReplicationController on Kubernetes cluster') log.info('Created ReplicationController on Kubernetes cluster')
info = self.inspect_replication_controller(service_instance.name) info = self.inspect_replication_controller(service_instance.name)
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
return info return info
...@@ -306,7 +307,7 @@ class KubernetesClient: ...@@ -306,7 +307,7 @@ class KubernetesClient:
except pykube.exceptions.ObjectDoesNotExist: except pykube.exceptions.ObjectDoesNotExist:
return None return None
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
return None return None
return info return info
...@@ -319,7 +320,7 @@ class KubernetesClient: ...@@ -319,7 +320,7 @@ class KubernetesClient:
for rep in repcon_list: for rep in repcon_list:
rclist.append(self.inspect_replication_controller(rep.name)) rclist.append(self.inspect_replication_controller(rep.name))
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
return rclist return rclist
def spawn_service(self, service_instance: ServiceInstance): def spawn_service(self, service_instance: ServiceInstance):
...@@ -340,7 +341,7 @@ class KubernetesClient: ...@@ -340,7 +341,7 @@ class KubernetesClient:
pykube.Service(self.api, config.get_json()).create() pykube.Service(self.api, config.get_json()).create()
log.info('created service on Kubernetes cluster') log.info('created service on Kubernetes cluster')
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
def inspect_service(self, name) -> Dict[str, Any]: def inspect_service(self, name) -> Dict[str, Any]:
"""Get information of a specific service.""" """Get information of a specific service."""
...@@ -363,9 +364,9 @@ class KubernetesClient: ...@@ -363,9 +364,9 @@ class KubernetesClient:
for i in range(length): # type: int for i in range(length): # type: int
info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port'] info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort'] info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['targetPort']
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
info = None info = None
return info return info
...@@ -398,7 +399,7 @@ class KubernetesClient: ...@@ -398,7 +399,7 @@ class KubernetesClient:
log.info('Service deleted on Kubernetes cluster') log.info('Service deleted on Kubernetes cluster')
except Exception as ex: except Exception as ex:
log.error(ex) log.exception(ex)
def info(self) -> ClusterStats: # pylint: disable=too-many-locals def info(self) -> ClusterStats: # pylint: disable=too-many-locals
"""Retrieve Kubernetes cluster statistics.""" """Retrieve Kubernetes cluster statistics."""
...@@ -413,12 +414,16 @@ class KubernetesClient: ...@@ -413,12 +414,16 @@ class KubernetesClient:
nss.cores_total = float(node.obj['status']['allocatable']['cpu']) nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory']) nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
nss.labels = node.obj['metadata']['labels'] nss.labels = node.obj['metadata']['labels']
nss.status = 'online'
node_dict[str(socket.gethostbyname(node.name))] = nss node_dict[str(socket.gethostbyname(node.name))] = nss
# Get information from all running pods, then accumulate to nodes # Get information from all running pods, then accumulate to nodes
pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator() pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
for pod in pod_list: for pod in pod_list:
host_ip = pod.obj['status']['hostIP'] try:
host_ip = pod.obj['status']['hostIP']
except KeyError:
continue
nss = node_dict[host_ip] nss = node_dict[host_ip]
nss.container_count += 1 nss.container_count += 1
spec_cont = pod.obj['spec']['containers'][0] spec_cont = pod.obj['spec']['containers'][0]
......
...@@ -22,7 +22,7 @@ from zoe_master.backends.kubernetes.api_client import KubernetesClient ...@@ -22,7 +22,7 @@ from zoe_master.backends.kubernetes.api_client import KubernetesClient
from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException from zoe_master.exceptions import ZoeStartExecutionRetryException, ZoeStartExecutionFatalException, ZoeException, ZoeNotEnoughResourcesException
from zoe_master.backends.service_instance import ServiceInstance from zoe_master.backends.service_instance import ServiceInstance
import zoe_master.backends.base import zoe_master.backends.base
from zoe_master.backends.kubernetes.threads import KubernetesMonitor from zoe_master.backends.kubernetes.threads import KubernetesMonitor, KubernetesStateSynchronizer
from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import from zoe_master.stats import NodeStats, ClusterStats # pylint: disable=unused-import
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -47,7 +47,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend): ...@@ -47,7 +47,7 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
@classmethod @classmethod
def shutdown(cls): def shutdown(cls):
"""Performs a clean shutdown of the resources used by Swarm backend.""" """Performs a clean shutdown of the resources used by Kubernetes backend."""
_monitor.quit() _monitor.quit()
# _checker.quit() # _checker.quit()
...@@ -56,12 +56,15 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend): ...@@ -56,12 +56,15 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
try: try:
self.kube.spawn_service(service_instance) self.kube.spawn_service(service_instance)
rc_info = self.kube.spawn_replication_controller(service_instance) rc_info = self.kube.spawn_replication_controller(service_instance)
sr_info = self.kube.inspect_service(service_instance.name)
except ZoeNotEnoughResourcesException: except ZoeNotEnoughResourcesException:
raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name)) raise ZoeStartExecutionRetryException('Not enough free resources to satisfy reservation request for service {}'.format(service_instance.name))
except ZoeException as e: except ZoeException as e:
raise ZoeStartExecutionFatalException(str(e)) raise ZoeStartExecutionFatalException(str(e))
return rc_info["backend_id"], rc_info['ip_address'], None ports = {x['port']: x['nodePort'] for x in sr_info['port_forwarding']}
return rc_info["backend_id"], rc_info['ip_address'], ports
def terminate_service(self, service: Service) -> None: def terminate_service(self, service: Service) -> None:
"""Terminate and delete a container.""" """Terminate and delete a container."""
...@@ -81,4 +84,18 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend): ...@@ -81,4 +84,18 @@ class KubernetesBackend(zoe_master.backends.base.BaseBackend):
def update_service(self, service, cores=None, memory=None): def update_service(self, service, cores=None, memory=None):
"""Update a service reservation.""" """Update a service reservation."""
log.error('Reservation update not implemented in the Swarm back-end') log.error('Reservation update not implemented in the Kubernetes back-end')
def node_list(self):
"""Return a list of node names."""
info = self.kube.info()
return [node.name for node in info.nodes]
def list_available_images(self, node_name):
"""List the images available on the specified node."""
info = self.kube.info()
for node in info.nodes:
if node.name == node_name:
return node.images
return []
...@@ -88,7 +88,7 @@ class KubernetesMonitor(threading.Thread): ...@@ -88,7 +88,7 @@ class KubernetesMonitor(threading.Thread):
self.stop = True self.stop = True
CHECK_INTERVAL = 300 CHECK_INTERVAL = 10
class KubernetesStateSynchronizer(threading.Thread): class KubernetesStateSynchronizer(threading.Thread):
......
...@@ -29,19 +29,20 @@ log = logging.getLogger(__name__) ...@@ -29,19 +29,20 @@ log = logging.getLogger(__name__)
def _digest_application_description(state: SQLManager, execution: Execution): def _digest_application_description(state: SQLManager, execution: Execution):
"""Read an application description and expand it into services that can be deployed.""" """Read an application description and expand it into services that can be deployed."""
nodes = node_list() if get_conf().backend == 'DockerEngine':
images = [] nodes = node_list()
for node in nodes: images = []
images += list_available_images(node) for node in nodes:
images += list_available_images(node)
images = [name for image in images for name in image['names']]
if len(images) == 0: images = [name for image in images for name in image['names']]
log.warning('The image list reported by the back-end is empty') if len(images) == 0:
for service_descr in execution.description['services']: log.warning('The image list reported by the back-end is empty')
if service_descr['image'] not in images: for service_descr in execution.description['services']:
execution.set_error() if service_descr['image'] not in images:
execution.set_error_message('image {} is not available'.format(service_descr['image'])) execution.set_error()
return False execution.set_error_message('image {} is not available'.format(service_descr['image']))
return False
for service_descr in execution.description['services']: for service_descr in execution.description['services']:
essential_count = service_descr['essential_count'] essential_count = service_descr['essential_count']
......
...@@ -57,6 +57,8 @@ class SimulatedNode: ...@@ -57,6 +57,8 @@ class SimulatedNode:
return 'unknown reason' return 'unknown reason'
def _image_is_available(self, image_name) -> bool: def _image_is_available(self, image_name) -> bool:
if get_conf().backend != 'DockerEngine':
return True
for image in self.images: for image in self.images:
if image_name in image['names']: if image_name in image['names']:
return True return True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment