api_client.py 16.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
import json
import time
20 21 22
from argparse import Namespace

from typing import Dict, Any, List
23
import humanfriendly
24
import pykube
25 26

from zoe_master.stats import ClusterStats, NodeStats
27
from zoe_master.backends.service_instance import ServiceInstance
28
from zoe_lib.version import ZOE_VERSION
29
from zoe_lib.state import VolumeDescription, VolumeDescriptionHostPath
30
from zoe_lib.config import get_conf
31 32 33

log = logging.getLogger(__name__)

34 35
ZOE_LABELS = {
    "app": "zoe",
36 37
    "version": ZOE_VERSION,
    "auto-ingress/enabled" : "enabled"
38
}
39 40

class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
41
    """Kubeconfig class"""
42 43
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
44 45
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
46

47

48 49 50
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
51 52 53 54
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
55 56
                'labels': {},
                'namespace': get_conf().kube_namespace
57 58 59 60 61 62
            },
            'spec': {
                'selector': {},
                'ports': []
            },
        }
63

hxquangnhat's avatar
hxquangnhat committed
64 65
    def set_name(self, name):
        """Setter to set name"""
66 67
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
68 69 70 71
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
72

hxquangnhat's avatar
hxquangnhat committed
73 74
    def set_ports(self, ports):
        """Setter to set ports"""
75
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
76
        count = 0  # type: int
77

hxquangnhat's avatar
hxquangnhat committed
78
        for prt in ports:
79 80
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
            aux[count]['name'] = 'http'
qhoangxuan's avatar
qhoangxuan committed
81 82
            aux[count]['port'] = prt.number
            aux[count]['targetPort'] = prt.number
83
            count += 1
84

hxquangnhat's avatar
hxquangnhat committed
85 86 87 88
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
89

hxquangnhat's avatar
hxquangnhat committed
90 91
    def get_json(self):
        """get Json files"""
92 93
        return self.conf

94

95 96 97
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
98 99 100 101
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
102 103
                'labels': {},
                'namespace': get_conf().kube_namespace
104 105 106 107 108 109 110 111 112 113 114 115 116 117
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
118

hxquangnhat's avatar
hxquangnhat committed
119 120
    def set_name(self, name):
        """Setter to set name"""
121 122
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
123 124 125 126
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
127

hxquangnhat's avatar
hxquangnhat committed
128 129
    def set_replicas(self, reps):
        """Setter to set replicas"""
130 131
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
132 133 134 135
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
136

hxquangnhat's avatar
hxquangnhat committed
137 138 139
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
140 141
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
142

hxquangnhat's avatar
hxquangnhat committed
143 144
    def set_spec_container_image(self, image):
        """Setter to set container image"""
145 146
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
147

hxquangnhat's avatar
hxquangnhat committed
148 149
    def set_spec_container_name(self, name):
        """Setter to set container name"""
150 151
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
152

hxquangnhat's avatar
hxquangnhat committed
153 154
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
155 156
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
157 158 159
        count = 0

        for k in env:
qhoangxuan's avatar
qhoangxuan committed
160 161
            aux['spec']['containers'][0]['env'][count]['name'] = k[0]
            aux['spec']['containers'][0]['env'][count]['value'] = k[1]
162
            count += 1
163

hxquangnhat's avatar
hxquangnhat committed
164 165
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
166 167
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
168 169
        count = 0

hxquangnhat's avatar
hxquangnhat committed
170
        for prt in ports:
qhoangxuan's avatar
qhoangxuan committed
171
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt.number
172
            count += 1
173

hxquangnhat's avatar
hxquangnhat committed
174 175
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
176
        memset = str(memlimit / (1024*1024)) + "Mi"
177 178 179 180
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
181

hxquangnhat's avatar
hxquangnhat committed
182 183
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
184 185
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
186

187
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
188
        """Setter to set container volumes"""
189 190
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
191 192
        count = 0

hxquangnhat's avatar
hxquangnhat committed
193
        for vol in volumes:
194 195 196 197 198 199 200 201
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
                aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
202 203

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
204 205
        count = 0

hxquangnhat's avatar
hxquangnhat committed
206
        for vol in volumes:
207 208 209 210 211 212 213 214 215 216
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
                aux['spec']['volumes'][count]['hostPath'] = {
                    'path': vol.path
                }
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
217

hxquangnhat's avatar
hxquangnhat committed
218 219
    def get_json(self):
        """Get json file"""
220 221
        return self.conf

222

223 224 225
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
226 227 228 229
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
230

231
    def spawn_replication_controller(self, service_instance: ServiceInstance):
232 233
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
234
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
235 236

        config.set_labels(ZOE_LABELS)
237 238
        config.set_labels({'service_name': service_instance.name})
        config.set_replicas(service_instance.replicas_count)
239

hxquangnhat's avatar
hxquangnhat committed
240
        config.set_spec_selector(ZOE_LABELS)
241
        config.set_spec_selector({'service_name': service_instance.name})
242

hxquangnhat's avatar
hxquangnhat committed
243
        config.set_temp_meta_labels(ZOE_LABELS)
244
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
245

246 247
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
248

249 250
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
251

252 253
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
254

Daniele Venzano's avatar
Daniele Venzano committed
255 256
        if service_instance.memory_limit is not None:
            config.set_spec_container_mem_limit(service_instance.memory_limit.max)
257

Daniele Venzano's avatar
Daniele Venzano committed
258 259
        if service_instance.core_limit is not None:
            config.set_spec_container_core_limit(service_instance.core_limit.max)
260

261 262
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
263 264 265 266

        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
267
            pykube.ReplicationController(self.api, config.get_json()).create()
268
            log.info('Created ReplicationController on Kubernetes cluster')
269
            info = self.inspect_replication_controller(service_instance.name)
270 271 272 273 274 275 276 277
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
278
            repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
279 280
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
281 282

            info = {
283 284
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
285 286 287
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
288

289 290 291 292
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
293

294 295 296
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
297
            if ready_replicas <= 0:
298 299
                info['state'] = 'undefined'
                info['running'] = False
300
            if 0 < ready_replicas <= no_replicas:
301 302 303 304 305 306
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

307
        except pykube.exceptions.ObjectDoesNotExist:
308 309 310 311 312 313 314 315 316
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
317
        repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=ZOE_LABELS).iterator()
318 319
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
320 321
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
322 323 324 325
        except Exception as ex:
            log.error(ex)
        return rclist

326
    def spawn_service(self, service_instance: ServiceInstance):
327 328 329
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

330
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
331
        config.set_labels(ZOE_LABELS)
332
        config.set_labels({'service_name': service_instance.name})
333

334 335
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
336

hxquangnhat's avatar
hxquangnhat committed
337
        config.set_selectors(ZOE_LABELS)
338
        config.set_selectors({'service_name': service_instance.name})
339 340

        try:
hxquangnhat's avatar
hxquangnhat committed
341
            pykube.Service(self.api, config.get_json()).create()
342 343 344 345 346 347 348 349
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
350
            service_list = pykube.Service.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
351
            service = service_list.get_by_name(name)
352 353 354 355
            srv_info = service.obj

            info = {
                'service_name': name,
356
                'port_forwarding': []  # type: List[Dict]
357 358 359 360 361
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

362
            length = len(srv_info['spec']['ports'])
363

364
            info['port_forwarding'] = [{} for _ in range(length)]
365

366
            for i in range(length):  # type: int
367 368 369 370
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
371
            info = None
372 373 374 375 376 377

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
378 379 380 381
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
382 383
                'name': name,
                'namespace': get_conf().kube_namespace
384 385
            }
        }
386
        try:
hxquangnhat's avatar
hxquangnhat committed
387 388
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
389

hxquangnhat's avatar
hxquangnhat committed
390 391
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
392

hxquangnhat's avatar
hxquangnhat committed
393 394
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
395
            pod_selector['service_name'] = name
396
            pods = pykube.Pod.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
397 398 399
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
400 401 402 403 404

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

405
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
406 407
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
408

409
        node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
hxquangnhat's avatar
hxquangnhat committed
410
        node_dict = {}
411

412
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
413 414 415 416 417 418 419
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

420
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
421 422 423 424
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
425
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
426 427 428 429 430 431 432 433
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
434
                        # ex: cpu could be 100m or 0.1
435 436 437 438 439
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
455 456 457
        pl_status.timestamp = time.time()

        return pl_status