api_client.py 16.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
import json
import time
20 21 22
from argparse import Namespace

from typing import Dict, Any, List
23
import humanfriendly
24
import pykube
25 26

from zoe_master.stats import ClusterStats, NodeStats
27
from zoe_master.backends.service_instance import ServiceInstance
28
from zoe_lib.version import ZOE_VERSION
29
from zoe_lib.state import VolumeDescription, VolumeDescriptionHostPath
30
from zoe_lib.config import get_conf
31 32 33

log = logging.getLogger(__name__)

34 35
ZOE_LABELS = {
    "app": "zoe",
36 37
    "version": ZOE_VERSION,
    "auto-ingress/enabled" : "enabled"
38
}
39 40

class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
41
    """Kubeconfig class"""
42 43
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
44 45
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
46

47

48 49 50
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
51 52 53 54
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
55 56
                'labels': {},
                'namespace': get_conf().kube_namespace
57 58 59 60 61 62
            },
            'spec': {
                'selector': {},
                'ports': []
            },
        }
63

hxquangnhat's avatar
hxquangnhat committed
64 65
    def set_name(self, name):
        """Setter to set name"""
66 67
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
68 69 70 71
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
72

hxquangnhat's avatar
hxquangnhat committed
73 74
    def set_ports(self, ports):
        """Setter to set ports"""
75
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
76
        count = 0  # type: int
77

hxquangnhat's avatar
hxquangnhat committed
78
        for prt in ports:
79
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
80
            aux[count]['name'] = 'http-' + str(count)
qhoangxuan's avatar
qhoangxuan committed
81 82
            aux[count]['port'] = prt.number
            aux[count]['targetPort'] = prt.number
83
            count += 1
84

hxquangnhat's avatar
hxquangnhat committed
85 86 87 88
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
89

hxquangnhat's avatar
hxquangnhat committed
90 91
    def get_json(self):
        """get Json files"""
92 93
        return self.conf

94

95 96 97
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
98 99 100 101
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
102 103
                'labels': {},
                'namespace': get_conf().kube_namespace
104 105 106 107 108 109 110 111 112 113 114 115 116 117
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
118

hxquangnhat's avatar
hxquangnhat committed
119 120
    def set_name(self, name):
        """Setter to set name"""
121 122
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
123 124 125 126
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
127

hxquangnhat's avatar
hxquangnhat committed
128 129
    def set_replicas(self, reps):
        """Setter to set replicas"""
130 131
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
132 133 134 135
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
136

hxquangnhat's avatar
hxquangnhat committed
137 138 139
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
140 141
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
142

hxquangnhat's avatar
hxquangnhat committed
143 144
    def set_spec_container_image(self, image):
        """Setter to set container image"""
145 146
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
147

hxquangnhat's avatar
hxquangnhat committed
148 149
    def set_spec_container_name(self, name):
        """Setter to set container name"""
150 151
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
152

hxquangnhat's avatar
hxquangnhat committed
153 154
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
155 156
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
157 158 159
        count = 0

        for k in env:
qhoangxuan's avatar
qhoangxuan committed
160 161
            aux['spec']['containers'][0]['env'][count]['name'] = k[0]
            aux['spec']['containers'][0]['env'][count]['value'] = k[1]
162
            count += 1
163

hxquangnhat's avatar
hxquangnhat committed
164 165
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
166 167
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
168 169
        count = 0

hxquangnhat's avatar
hxquangnhat committed
170
        for prt in ports:
qhoangxuan's avatar
qhoangxuan committed
171
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt.number
172
            count += 1
173

hxquangnhat's avatar
hxquangnhat committed
174 175
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
176
        memset = str(memlimit / (1024*1024)) + "Mi"
177 178 179 180
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
181

hxquangnhat's avatar
hxquangnhat committed
182 183
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
184 185
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
186

187 188 189 190 191 192 193
    def set_spec_container_command(self, command):
        """Setter to set container command"""
        aux = self.conf['spec']['template']
        aux['spec']['containers'][0]['command'] = []
        command_arr = command.split(" ")
        aux['spec']['containers'][0]['command'] = command_arr

194
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
195
        """Setter to set container volumes"""
196 197
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
198 199
        count = 0

hxquangnhat's avatar
hxquangnhat committed
200
        for vol in volumes:
201 202 203 204 205 206 207 208
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
                aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
209 210

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
211 212
        count = 0

hxquangnhat's avatar
hxquangnhat committed
213
        for vol in volumes:
214 215 216 217 218 219 220 221 222 223
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
                aux['spec']['volumes'][count]['hostPath'] = {
                    'path': vol.path
                }
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
224

hxquangnhat's avatar
hxquangnhat committed
225 226
    def get_json(self):
        """Get json file"""
227 228
        return self.conf

229

230 231 232
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
233 234 235 236
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
237

238
    def spawn_replication_controller(self, service_instance: ServiceInstance):
239 240
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
241
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
242 243

        config.set_labels(ZOE_LABELS)
244
        config.set_labels({'service_name': service_instance.name})
245
        config.set_replicas(1)
246

hxquangnhat's avatar
hxquangnhat committed
247
        config.set_spec_selector(ZOE_LABELS)
248
        config.set_spec_selector({'service_name': service_instance.name})
249

hxquangnhat's avatar
hxquangnhat committed
250
        config.set_temp_meta_labels(ZOE_LABELS)
251
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
252

253 254
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
255

256 257
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
258

259 260
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
261

262 263
        if service_instance.memory_limit is not None:
            config.set_spec_container_mem_limit(service_instance.memory_limit.max)
264

265 266
        if service_instance.core_limit is not None:
            config.set_spec_container_core_limit(service_instance.core_limit.max)
267

268 269
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
270

271 272 273
        if service_instance.command is not None:
            config.set_spec_container_command(service_instance.command)

274 275 276
        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
277
            pykube.ReplicationController(self.api, config.get_json()).create()
278
            log.info('Created ReplicationController on Kubernetes cluster')
279
            info = self.inspect_replication_controller(service_instance.name)
280 281 282 283 284 285 286 287
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
288
            repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
289 290
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
291 292

            info = {
293 294
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
295 296 297
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
298

299 300 301 302
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
303

304 305 306
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
307
            if ready_replicas <= 0:
308 309
                info['state'] = 'undefined'
                info['running'] = False
310
            if 0 < ready_replicas <= no_replicas:
311 312 313 314 315 316
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

317
        except pykube.exceptions.ObjectDoesNotExist:
318 319 320 321 322 323 324 325 326
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
327
        repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=ZOE_LABELS).iterator()
328 329
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
330 331
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
332 333 334 335
        except Exception as ex:
            log.error(ex)
        return rclist

336
    def spawn_service(self, service_instance: ServiceInstance):
337 338 339
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

340
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
341
        config.set_labels(ZOE_LABELS)
342
        config.set_labels({'service_name': service_instance.name})
343

344 345
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
346

hxquangnhat's avatar
hxquangnhat committed
347
        config.set_selectors(ZOE_LABELS)
348
        config.set_selectors({'service_name': service_instance.name})
349 350

        try:
hxquangnhat's avatar
hxquangnhat committed
351
            pykube.Service(self.api, config.get_json()).create()
352 353 354 355 356 357 358 359
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
360
            service_list = pykube.Service.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
361
            service = service_list.get_by_name(name)
362 363 364 365
            srv_info = service.obj

            info = {
                'service_name': name,
366
                'port_forwarding': []  # type: List[Dict]
367 368 369 370 371
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

372
            length = len(srv_info['spec']['ports'])
373

374
            info['port_forwarding'] = [{} for _ in range(length)]
375

376
            for i in range(length):  # type: int
377 378 379 380
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
381
            info = None
382 383 384 385 386 387

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
388 389 390 391
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
392 393
                'name': name,
                'namespace': get_conf().kube_namespace
394 395
            }
        }
396
        try:
hxquangnhat's avatar
hxquangnhat committed
397 398
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
399

hxquangnhat's avatar
hxquangnhat committed
400 401
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
402

hxquangnhat's avatar
hxquangnhat committed
403 404
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
405
            pod_selector['service_name'] = name
406
            pods = pykube.Pod.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
407 408 409
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
410 411 412 413 414

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

415
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
416 417
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
418

419
        node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
hxquangnhat's avatar
hxquangnhat committed
420
        node_dict = {}
421

422
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
423 424 425 426 427 428 429
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

430
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
431 432 433 434
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
435
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
436 437 438 439 440 441 442 443
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
444
                        # ex: cpu could be 100m or 0.1
445 446 447 448 449
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
465 466 467
        pl_status.timestamp = time.time()

        return pl_status