api_client.py 16.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
import json
import time
20 21 22
from argparse import Namespace

from typing import Dict, Any, List
23
import humanfriendly
24
import pykube
25 26

from zoe_master.stats import ClusterStats, NodeStats
27
from zoe_master.backends.service_instance import ServiceInstance
28
from zoe_lib.version import ZOE_VERSION
29
from zoe_lib.state import VolumeDescription
30
from zoe_lib.config import get_conf
31 32 33

log = logging.getLogger(__name__)

34 35
ZOE_LABELS = {
    "app": "zoe",
36 37
    "version": ZOE_VERSION,
    "auto-ingress/enabled" : "enabled"
38
}
39 40

class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
41
    """Kubeconfig class"""
42 43
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
44 45
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
46

47

48 49 50
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
51 52 53 54
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
55 56
                'labels': {},
                'namespace': get_conf().kube_namespace
57 58 59 60 61 62
            },
            'spec': {
                'selector': {},
                'ports': []
            },
        }
63

hxquangnhat's avatar
hxquangnhat committed
64 65
    def set_name(self, name):
        """Setter to set name"""
66 67
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
68 69 70 71
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
72

hxquangnhat's avatar
hxquangnhat committed
73 74
    def set_ports(self, ports):
        """Setter to set ports"""
75
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
76
        count = 0  # type: int
77

hxquangnhat's avatar
hxquangnhat committed
78
        for prt in ports:
79 80
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
            aux[count]['name'] = 'http'
qhoangxuan's avatar
qhoangxuan committed
81 82
            aux[count]['port'] = prt.number
            aux[count]['targetPort'] = prt.number
83
            count += 1
84

hxquangnhat's avatar
hxquangnhat committed
85 86 87 88
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
89

hxquangnhat's avatar
hxquangnhat committed
90 91
    def get_json(self):
        """get Json files"""
92 93
        return self.conf

94

95 96 97
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
98 99 100 101
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
102 103
                'labels': {},
                'namespace': get_conf().kube_namespace
104 105 106 107 108 109 110 111 112 113 114 115 116 117
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
118

hxquangnhat's avatar
hxquangnhat committed
119 120
    def set_name(self, name):
        """Setter to set name"""
121 122
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
123 124 125 126
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
127

hxquangnhat's avatar
hxquangnhat committed
128 129
    def set_replicas(self, reps):
        """Setter to set replicas"""
130 131
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
132 133 134 135
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
136

hxquangnhat's avatar
hxquangnhat committed
137 138 139
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
140 141
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
142

hxquangnhat's avatar
hxquangnhat committed
143 144
    def set_spec_container_image(self, image):
        """Setter to set container image"""
145 146
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
147

hxquangnhat's avatar
hxquangnhat committed
148 149
    def set_spec_container_name(self, name):
        """Setter to set container name"""
150 151
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
152

hxquangnhat's avatar
hxquangnhat committed
153 154
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
155 156
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
157 158 159
        count = 0

        for k in env:
qhoangxuan's avatar
qhoangxuan committed
160 161
            aux['spec']['containers'][0]['env'][count]['name'] = k[0]
            aux['spec']['containers'][0]['env'][count]['value'] = k[1]
162
            count += 1
163

hxquangnhat's avatar
hxquangnhat committed
164 165
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
166 167
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
168 169
        count = 0

hxquangnhat's avatar
hxquangnhat committed
170
        for prt in ports:
qhoangxuan's avatar
qhoangxuan committed
171
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt.number
172
            count += 1
173

hxquangnhat's avatar
hxquangnhat committed
174 175
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
176
        memset = str(memlimit / (1024*1024)) + "Mi"
177 178 179 180
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
181

hxquangnhat's avatar
hxquangnhat committed
182 183
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
184 185
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
186

187
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
188
        """Setter to set container volumes"""
189 190
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
191 192
        count = 0

hxquangnhat's avatar
hxquangnhat committed
193
        for vol in volumes:
194 195
            if vol.type != "host_directory":
                log.error('Swarm backend does not support volume type {}'.format(vol.type))
196

197 198 199 200 201
            aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
            aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
            count += 1

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
202 203
        count = 0

hxquangnhat's avatar
hxquangnhat committed
204
        for vol in volumes:
205 206 207 208 209
            aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
            aux['spec']['volumes'][count]['hostPath'] = {
                'path': vol.path
            }
            count += 1
210

hxquangnhat's avatar
hxquangnhat committed
211 212
    def get_json(self):
        """Get json file"""
213 214
        return self.conf

215

216 217 218
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
219 220 221 222
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
223

224
    def spawn_replication_controller(self, service_instance: ServiceInstance):
225 226
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
227
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
228 229

        config.set_labels(ZOE_LABELS)
230 231
        config.set_labels({'service_name': service_instance.name})
        config.set_replicas(service_instance.replicas_count)
232

hxquangnhat's avatar
hxquangnhat committed
233
        config.set_spec_selector(ZOE_LABELS)
234
        config.set_spec_selector({'service_name': service_instance.name})
235

hxquangnhat's avatar
hxquangnhat committed
236
        config.set_temp_meta_labels(ZOE_LABELS)
237
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
238

239 240
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
241

242 243
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
244

245 246
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
247

Daniele Venzano's avatar
Daniele Venzano committed
248 249
        if service_instance.memory_limit is not None:
            config.set_spec_container_mem_limit(service_instance.memory_limit.max)
250

Daniele Venzano's avatar
Daniele Venzano committed
251 252
        if service_instance.core_limit is not None:
            config.set_spec_container_core_limit(service_instance.core_limit.max)
253

254 255
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
256 257 258 259

        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
260
            pykube.ReplicationController(self.api, config.get_json()).create()
261
            log.info('Created ReplicationController on Kubernetes cluster')
262
            info = self.inspect_replication_controller(service_instance.name)
263 264 265 266 267 268 269 270
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
271
            repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
272 273
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
274 275

            info = {
276 277
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
278 279 280
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
281

282 283 284 285
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
286

287 288 289
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
290
            if ready_replicas <= 0:
291 292
                info['state'] = 'undefined'
                info['running'] = False
293
            if 0 < ready_replicas <= no_replicas:
294 295 296 297 298 299
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

300
        except pykube.exceptions.ObjectDoesNotExist:
301 302 303 304 305 306 307 308 309
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
310
        repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=ZOE_LABELS).iterator()
311 312
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
313 314
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
315 316 317 318
        except Exception as ex:
            log.error(ex)
        return rclist

319
    def spawn_service(self, service_instance: ServiceInstance):
320 321 322
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

323
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
324
        config.set_labels(ZOE_LABELS)
325
        config.set_labels({'service_name': service_instance.name})
326

327 328
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
329

hxquangnhat's avatar
hxquangnhat committed
330
        config.set_selectors(ZOE_LABELS)
331
        config.set_selectors({'service_name': service_instance.name})
332 333

        try:
hxquangnhat's avatar
hxquangnhat committed
334
            pykube.Service(self.api, config.get_json()).create()
335 336 337 338 339 340 341 342
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
343
            service_list = pykube.Service.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
344
            service = service_list.get_by_name(name)
345 346 347 348
            srv_info = service.obj

            info = {
                'service_name': name,
349
                'port_forwarding': []  # type: List[Dict]
350 351 352 353 354
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

355
            length = len(srv_info['spec']['ports'])
356

357
            info['port_forwarding'] = [{} for _ in range(length)]
358

359
            for i in range(length):  # type: int
360 361 362 363
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
364
            info = None
365 366 367 368 369 370

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
371 372 373 374
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
375 376
                'name': name,
                'namespace': get_conf().kube_namespace
377 378
            }
        }
379
        try:
hxquangnhat's avatar
hxquangnhat committed
380 381
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
382

hxquangnhat's avatar
hxquangnhat committed
383 384
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
385

hxquangnhat's avatar
hxquangnhat committed
386 387
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
388
            pod_selector['service_name'] = name
389
            pods = pykube.Pod.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
390 391 392
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
393 394 395 396 397

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

398
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
399 400
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
401

402
        node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
hxquangnhat's avatar
hxquangnhat committed
403
        node_dict = {}
404

405
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
406 407 408 409 410 411 412
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

413
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
414 415 416 417
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
418
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
419 420 421 422 423 424 425 426
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
427
                        # ex: cpu could be 100m or 0.1
428 429 430 431 432
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
448 449 450
        pl_status.timestamp = time.time()

        return pl_status