Commit 29de9836 authored by Quang-Nhat HOANG-XUAN's avatar Quang-Nhat HOANG-XUAN Committed by GitHub
Browse files

Revert "Revert "I29 t2.1.3""

parent 6b6bb402
.. _kube_backend:
Kubernetes backend for Zoe
==========================
Overview
--------
* Running Zoe on a Kubernetes cluster instead of Swarm
* Support High availability for Zapp services
Requirements
------------
* A running Kubernetes cluster, version 1.4.7
* pykube python library, version >=0.14.0
How it works
------------
1. Zoe configuration file:
* ``--backend``: put Kubernetes instead of Docker Swarm
* ``--kube-config-file``: the configuration file of kubernetes cluster
2. Zoe:
* Zapp Description:
* Add new field: ``replicas``, if users doesn't specify this value, the default value for each service would be ``1``.
* In field ``require_resources``, the ``cores`` field can be float.
* Idea:
* Create each **replication controller** per each service of a Zapp. Replication Controller assures to have at least a number of **replicas** (pod) always running.
* Create a Kubernetes **service** per each **replication controller**, which has the same **labels** and **label selectors** with the associated **replication controller**. The service would help the zapp service be exposed to the network by exposing the same port of the service on all kubernetes nodes.
References
----------
* Kubernetes: https://kubernetes.io/
* Kubernetes Replication Controller : https://kubernetes.io/docs/user-guide/replication-controller/
* Kubernetes Service: https://kubernetes.io/docs/user-guide/services/
* Kubernetes Limit and Request: https://kubernetes.io/docs/user-guide/compute-resources/
......@@ -8,3 +8,4 @@ psycopg2>=2.6.1
pyzmq>=15.2.0
typing
python-consul
pykube>=0.14.0
......@@ -18,10 +18,12 @@
import docker
import time
import logging
import random
import zoe_api.proxy.base
import zoe_api.api_endpoint
from zoe_master.backends.old_swarm.api_client import SwarmClient
from zoe_master.backends.kubernetes.api_client import KubernetesClient
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
......@@ -56,24 +58,38 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy):
#Start proxifying by adding entry to use proxypass and proxypassreverse in apache2 config file
for srv in exe.services:
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(srv.backend_id)
ip, p = None, None
portList = s_info['ports']
for k,v in portList.items():
exposedPort = k.split('/tcp')[0]
if v != None:
ip = v[0]
p = v[1]
base_path = '/zoe/' + uid + '/' + str(id) + '/' + srv.name + '/' + exposedPort
original_path = str(ip) + ':' + str(p) + base_path
if ip is not None and p is not None:
log.info('Proxifying %s', srv.name + ' port ' + exposedPort)
self.dispatch_to_docker(base_path, original_path)
if get_conf().backend == 'OldSwarm':
swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(srv.docker_id)
portList = s_info['ports']
for s in portList.values():
if s != None:
ip = s[0]
p = s[1]
else:
kube = KubernetesClient(get_conf())
s_info = kube.inspect_service(srv.dns_name)
kubeNodes = kube.info().nodes
hostIP = random.choice(kubeNodes).name
while 'nodePort' not in s_info['port_forwarding'][0]:
log.info('Waiting for service get started before proxifying...')
s_info = kube.inspect_service(srv.dns_name)
time.sleep(0.5)
ip = hostIP
p = s_info['port_forwarding'][0]['nodePort']
base_path = '/zoe/' + uid + '/' + str(id) + '/' + srv.name
original_path = str(ip) + ':' + str(p) + base_path
if ip is not None and p is not None:
log.info('Proxifying %s', srv.name + ' port ' + exposedPort)
self.dispatch_to_docker(base_path, original_path)
except Exception as ex:
log.error(ex)
......@@ -90,7 +106,7 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy):
'',
'</VirtualHost>']
docker_client = SwarmClient(get_conf()).cli
docker_client = docker.Client(base_url=get_conf().proxy_docker_sock)
delCommand = "sed -i '$ d' " + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf"
delID = docker_client.exec_create(get_conf().proxy_container, delCommand)
......@@ -109,7 +125,7 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy):
def unproxify(self, uid, role, id):
log.info('Unproxifying for user %s - execution %s', uid, str(id))
pattern = '/zoe\/' + uid + '\/' + str(id) + '/d'
docker_client = docker.Client(base_url="unix:///var/run/docker.sock")
docker_client = docker.Client(base_url=get_conf().proxy_docker_sock)
delCommand = 'sed -i "' + pattern + '" ' + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf'
delID = docker_client.exec_create(get_conf().proxy_container, delCommand)
docker_client.exec_start(delID)
......@@ -22,6 +22,7 @@ import re
from zoe_lib.exceptions import InvalidApplicationDescription
import zoe_lib.version
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
......@@ -124,10 +125,11 @@ def _service_check(data):
int(data['required_resources']['memory'])
except ValueError:
raise InvalidApplicationDescription(msg="required_resources -> memory field should be an int")
try:
int(data['required_resources']['cores'])
float(data['required_resources']['cores'])
except ValueError:
raise InvalidApplicationDescription(msg="required_resources -> cores field should be an int")
raise InvalidApplicationDescription(msg="required_resources -> cores field should be a float")
if 'environment' in data:
if not hasattr(data['environment'], '__iter__'):
......@@ -152,6 +154,16 @@ def _service_check(data):
if 'constraints' in data and not hasattr(data['constraints'], '__iter__'):
raise InvalidApplicationDescription(msg='networks should be an iterable')
if get_conf().backend == 'Kubernetes':
if 'replicas' not in data:
data['replicas'] = 1
try:
int(data['replicas'])
except ValueError:
raise InvalidApplicationDescription(msg="replicas field should be an int")
def _port_check(data):
"""Check the port description schema."""
......
......@@ -89,11 +89,13 @@ def load_configuration(test_conf=None):
argparser.add_argument('--proxy-container', help='Proxy container name', default='apache2')
argparser.add_argument('--proxy-config-file', help='Config file path of apache/nginx proxy container', default='/etc/apache2/sites-available/config.conf')
argparser.add_argument('--proxy-path', help='Proxy base path', default='127.0.0.1')
argparser.add_argument('--proxy-docker-sock', help='Docker sock url which proxy container uses', default='unix://var/run/docker.sock')
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['OldSwarm'], default='OldSwarm')
argparser.add_argument('--backend', choices=['OldSwarm', 'Kubernetes'], default='OldSwarm')
argparser.add_argument('--kube-config-file', help='Kubernetes configuration file', default='/opt/zoe/kube.conf')
argparser.add_argument('--cookie-secret', help='secret used to encrypt cookies', default='changeme')
......
......@@ -111,6 +111,11 @@ class Service:
self.volumes = [VolumeDescription(v) for v in self.description['volumes']]
self.ports = [ExposedPort(p) for p in self.description['ports']]
if 'replicas' in self.description:
self.replicas = self.description['replicas']
else:
self.replicas = 1
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
return {
......
......@@ -23,6 +23,7 @@ from zoe_lib.state import Execution, Service
from zoe_master.backends.base import BaseBackend
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
from zoe_master.backends.kubernetes.backend import KubernetesBackend
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
log = logging.getLogger(__name__)
......@@ -33,6 +34,8 @@ def _get_backend() -> BaseBackend:
backend_name = get_conf().backend
if backend_name == 'OldSwarm':
return OldSwarmBackend(get_conf())
elif backend_name == 'Kubernetes':
return KubernetesBackend(get_conf())
else:
log.error('Unknown backend selected')
assert False
......
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to the low-level Kubernetes API."""
try:
import pykube
except ImportError:
pykube = None
import operator
import logging
import json
import time
import humanfriendly
from typing import Iterable, Callable, Dict, Any, Union
from zoe_master.stats import ClusterStats, NodeStats
from zoe_lib.version import ZOE_VERSION
from argparse import Namespace
from typing import Dict, Any
log = logging.getLogger(__name__)
zoe_labels = {"app": "zoe", "version": ZOE_VERSION}
class DockerContainerOptions:
"""Wrapper for the Docker container options."""
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
self.memory_limit = 2 * (1024**3)
self.cores_limit = 0
self.name = ''
self.ports = []
self.network_name = 'bridge'
self.restart = True
self.labels = []
self.gelf_log_address = ''
self.constraints = []
self.replicas = 1
def add_constraint(self, constraint):
"""Add a placement constraint (use docker syntax)."""
self.constraints.append(constraint)
def add_env_variable(self, name: str, value: Union[str, None]) -> None:
"""Add an environment variable to the container definition."""
self.env[name] = value
@property
def environment(self) -> Dict[str, Union[str, None]]:
"""Access the environment variables."""
return self.env
def add_volume_bind(self, path: str, mountpoint: str, readonly=False) -> None:
"""Add a volume to the container."""
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + ("ro" if readonly else "rw"))
def get_volumes(self) -> Iterable[str]:
"""Get the volumes in Docker format."""
return self.volumes
def get_volume_binds(self) -> Iterable[str]:
"""Get the volumes in another Docker format."""
return self.volume_binds
def set_command(self, cmd):
"""Setter for the command to run in the container."""
self.command = cmd
def get_command(self) -> str:
"""Getter for the command to run in the container."""
return self.command
def set_memory_limit(self, limit: int):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
def get_memory_limit(self) -> int:
"""Getter for the memory limit of the container."""
return self.memory_limit
def set_cores_limit(self, limit: float):
"""Setter for the cores limit of the container."""
self.cores_limit = limit
def get_cores_limit(self):
"""Getter for the cores limit of the container."""
return self.cores_limit
@property
def restart_policy(self) -> Dict[str, str]:
"""Getter for the restart policy of the container."""
if self.restart:
return {'Name': 'always'}
else:
return {}
def set_replicas(self, reps: int):
self.replicas = reps
def get_replicas(self) -> int:
return self.replicas
class KubernetesConf:
def __init__(self, jsonfile):
self.config = {}
with open(jsonfile, 'r') as f:
self.config = json.load(f)
class KubernetesServiceConf:
""" Wrapper for Kubernetes Service configuration """
def __init__(self):
self.conf = {}
self.conf['kind'] = 'Service'
self.conf['apiVersion'] = "v1"
self.conf['metadata'] = {}
self.conf['metadata']['labels'] = {}
self.conf['spec'] = {}
self.conf['spec']['selector'] = {}
self.conf['spec']['type'] = 'LoadBalancer'
def setName(self, name):
self.conf['metadata']['name'] = name
def setLabels(self, lb: dict):
for k in lb:
self.conf['metadata']['labels'][k] = lb[k]
def setPorts(self, ports):
self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
count = 0
for p in ports:
self.conf['spec']['ports'][count]['name'] = 'http'
self.conf['spec']['ports'][count]['port'] = p
self.conf['spec']['ports'][count]['targetPort'] = p
count = count + 1
def setSelectors(self, selectors: dict):
for k in selectors:
self.conf['spec']['selector'][k] = selectors[k]
def getJson(self):
return self.conf
class KubernetesReplicationControllerConf:
""" Wrapper for Kubernetes ReplicationController Configuration """
def __init__(self):
self.conf = {}
self.conf['kind'] = 'ReplicationController'
self.conf['apiVersion'] = "v1"
self.conf['metadata'] = {}
self.conf['metadata']['labels'] = {}
self.conf['spec'] = {}
self.conf['spec']['replicas'] = 1
self.conf['spec']['selector'] = {}
self.conf['spec']['template'] = {}
self.conf['spec']['template']['metadata'] = {}
self.conf['spec']['template']['metadata']['labels'] = {}
self.conf['spec']['template']['spec'] = {}
self.conf['spec']['template']['spec']['containers'] = [{}]
def setName(self, name):
self.conf['metadata']['name'] = name
def setLabels(self, lb: dict):
for k in lb:
self.conf['metadata']['labels'][k] = lb[k]
def setReplicas(self, reps):
self.conf['spec']['replicas'] = reps
def setSpecSelector(self, lb: dict):
for k in lb:
self.conf['spec']['selector'][k] = lb[k]
def setSpecTemplateMetadataLabels(self, lb: dict):
for k in lb:
self.conf['spec']['template']['metadata']['labels'][k] = lb[k]
def setSpecTemplateSpecContainerImage(self, image):
self.conf['spec']['template']['spec']['containers'][0]['image'] = image
def setSpecTemplateSpecContainerName(self, name):
self.conf['spec']['template']['spec']['containers'][0]['name'] = name
def setSpecTemplateSpecContainerEnv(self, env: dict):
self.conf['spec']['template']['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
count = 0
for k in env:
self.conf['spec']['template']['spec']['containers'][0]['env'][count]['name'] = k
self.conf['spec']['template']['spec']['containers'][0]['env'][count]['value'] = env[k]
count = count + 1
def setSpecTemplateSpecContainerPorts(self, ports):
self.conf['spec']['template']['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
count = 0
for p in ports:
self.conf['spec']['template']['spec']['containers'][0]['ports'][count]['containerPort'] = p
count = count + 1
def setSpecTemplateSpecContainerMemLimit(self, memlimit):
memset = str(memlimit / (1024*1024)) + "Mi"
self.conf['spec']['template']['spec']['containers'][0]['resources'] = {}
self.conf['spec']['template']['spec']['containers'][0]['resources']['limits'] = {}
self.conf['spec']['template']['spec']['containers'][0]['resources']['limits']['memory'] = memset
def setSpecTemplateSpecContainerCoreLimit(self, corelimit):
self.conf['spec']['template']['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
def setSpecTemplateSpecContainerVolumes(self, volumes, name):
self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
count = 0
for v in volumes:
vsplit = v.split(':')
self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vsplit[0]
self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
count = count + 1
self.conf['spec']['template']['spec']['volumes'] = [{} for _ in range(len(volumes))]
count = 0
for v in volumes:
vsplit = v.split(':')
self.conf['spec']['template']['spec']['volumes'][count]['name'] = name + "-" + str(count)
self.conf['spec']['template']['spec']['volumes'][count]['hostPath'] = {}
self.conf['spec']['template']['spec']['volumes'][count]['hostPath']['path'] = vsplit[1]
count = count + 1
def getJson(self):
return self.conf
class KubernetesClient:
"""The Kubernetes client class that wraps the Kubernetes API."""
def __init__(self, opts: Namespace) -> None:
try:
self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
except Exception as e:
log.error(e)
def spawn_replication_controller(self, image: str, options: DockerContainerOptions):
"""Create and start a new replication controller."""
config = KubernetesReplicationControllerConf()
config.setName(options.name)
config.setLabels(zoe_labels)
config.setLabels({'service_name' : options.name})
config.setReplicas(options.get_replicas())
config.setSpecSelector(zoe_labels)
config.setSpecSelector({'service_name' : options.name})
config.setSpecTemplateMetadataLabels(zoe_labels)
config.setSpecTemplateMetadataLabels({'service_name': options.name})
config.setSpecTemplateSpecContainerImage(image)
config.setSpecTemplateSpecContainerName(options.name)
if len(options.environment) > 0:
config.setSpecTemplateSpecContainerEnv(options.environment)
if len(options.ports) > 0:
config.setSpecTemplateSpecContainerPorts(options.ports)
config.setSpecTemplateSpecContainerMemLimit(options.get_memory_limit())
if options.get_cores_limit() != 0:
config.setSpecTemplateSpecContainerCoreLimit(options.get_cores_limit())
if len(list(options.get_volume_binds())) > 0:
config.setSpecTemplateSpecContainerVolumes(list(options.get_volume_binds()), options.name)
info = {}
try:
pykube.ReplicationController(self.api, config.getJson()).create()
log.info('Created ReplicationController on Kubernetes cluster')
info = self.inspect_replication_controller(options.name)
except Exception as ex:
log.error(ex)
return info
def inspect_replication_controller(self, name):
"""Get information about a specific replication controller."""
try:
repconList = pykube.ReplicationController.objects(self.api)
rc = repconList.get_by_name(name)
rc_info = rc.obj
info = {
"backend_id": rc_info['metadata']['uid']
}
info['ip_address'] = '0.0.0.0'
no_replicas = rc_info['spec']['replicas']
if 'readyReplicas' in rc_info['status']:
ready_replicas = rc_info['status']['readyReplicas']
else:
ready_replicas = 0
info['replicas'] = no_replicas
info['readyReplicas'] = ready_replicas
if ready_replicas <=0 :
info['state'] = 'undefined'
info['running'] = False
if ready_replicas > 0 and ready_replicas <= no_replicas:
info['state'] = 'running'
info['running'] = True
else:
info['state'] = 'undefined'
info['running'] = True
except pykube.exceptions.ObjectDoesNotExist as ex:
return None
except Exception as ex:
log.error(ex)
return None
return info
def replication_controller_list(self):
"""Get list of replication controller."""
repconList = pykube.ReplicationController.objects(self.api).filter(selector=zoe_labels).iterator()
rclist = []
try:
for rc in repconList:
rclist.append(self.inspect_replication_controller(rc.name))
except Exception as ex:
log.error(ex)
return rclist
def replication_controller_event(self):
"""Get event stream of the replication controller."""
rcStream = pykube.ReplicationController.objects(self.api).filter(selector=zoe_labels).watch()
return rcStream
def spawn_service(self, image: str, options: DockerContainerOptions):
"""Create and start a new Service object."""
config = KubernetesServiceConf()
config.setName(options.name)
config.setLabels(zoe_labels)
config.setLabels({'service_name' : options.name})
if len(options.ports) > 0: