api_client.py 16.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
18 19 20
from argparse import Namespace

from typing import Dict, Any, List
21
import humanfriendly
22
import pykube
23 24

from zoe_master.stats import ClusterStats, NodeStats
25
from zoe_master.backends.service_instance import ServiceInstance
26
from zoe_lib.version import ZOE_VERSION
27
from zoe_lib.state import VolumeDescription, VolumeDescriptionHostPath
28
from zoe_lib.config import get_conf
29 30 31

log = logging.getLogger(__name__)

32 33
ZOE_LABELS = {
    "app": "zoe",
34
    "version": ZOE_VERSION,
35
    "auto-ingress/enabled": "enabled"
36
}
37

38

39 40 41
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
42 43 44 45
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
46 47
                'labels': {},
                'namespace': get_conf().kube_namespace
48 49 50 51 52 53
            },
            'spec': {
                'selector': {},
                'ports': []
            },
        }
54

hxquangnhat's avatar
hxquangnhat committed
55 56
    def set_name(self, name):
        """Setter to set name"""
57 58
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
59 60 61 62
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
63

hxquangnhat's avatar
hxquangnhat committed
64 65
    def set_ports(self, ports):
        """Setter to set ports"""
66
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
67
        count = 0  # type: int
68

hxquangnhat's avatar
hxquangnhat committed
69
        for prt in ports:
70
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
71
            aux[count]['name'] = 'http-' + str(count)
qhoangxuan's avatar
qhoangxuan committed
72 73
            aux[count]['port'] = prt.number
            aux[count]['targetPort'] = prt.number
74
            count += 1
75

hxquangnhat's avatar
hxquangnhat committed
76 77 78 79
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
80

hxquangnhat's avatar
hxquangnhat committed
81 82
    def get_json(self):
        """get Json files"""
83 84
        return self.conf

85

86 87 88
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
89 90 91 92
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
93 94
                'labels': {},
                'namespace': get_conf().kube_namespace
95 96 97 98 99 100 101 102 103 104 105 106 107 108
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
109

hxquangnhat's avatar
hxquangnhat committed
110 111
    def set_name(self, name):
        """Setter to set name"""
112 113
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
114 115 116 117
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
118

hxquangnhat's avatar
hxquangnhat committed
119 120
    def set_replicas(self, reps):
        """Setter to set replicas"""
121 122
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
123 124 125 126
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
127

hxquangnhat's avatar
hxquangnhat committed
128 129 130
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
131 132
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
133

hxquangnhat's avatar
hxquangnhat committed
134 135
    def set_spec_container_image(self, image):
        """Setter to set container image"""
136 137
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
138

hxquangnhat's avatar
hxquangnhat committed
139 140
    def set_spec_container_name(self, name):
        """Setter to set container name"""
141 142
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
143

hxquangnhat's avatar
hxquangnhat committed
144 145
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
146 147
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
148 149 150
        count = 0

        for k in env:
qhoangxuan's avatar
qhoangxuan committed
151 152
            aux['spec']['containers'][0]['env'][count]['name'] = k[0]
            aux['spec']['containers'][0]['env'][count]['value'] = k[1]
153
            count += 1
154

hxquangnhat's avatar
hxquangnhat committed
155 156
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
157 158
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
159 160
        count = 0

hxquangnhat's avatar
hxquangnhat committed
161
        for prt in ports:
qhoangxuan's avatar
qhoangxuan committed
162
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt.number
163
            count += 1
164

hxquangnhat's avatar
hxquangnhat committed
165 166
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
167
        memset = str(memlimit / (1024*1024)) + "Mi"
168 169 170 171
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
172

hxquangnhat's avatar
hxquangnhat committed
173 174
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
175 176
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
177

178 179 180 181 182 183 184
    def set_spec_container_command(self, command):
        """Setter to set container command"""
        aux = self.conf['spec']['template']
        aux['spec']['containers'][0]['command'] = []
        command_arr = command.split(" ")
        aux['spec']['containers'][0]['command'] = command_arr

185
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
186
        """Setter to set container volumes"""
187 188
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
189 190
        count = 0

hxquangnhat's avatar
hxquangnhat committed
191
        for vol in volumes:
192 193 194 195 196 197 198 199
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
                aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
200 201

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
202 203
        count = 0

hxquangnhat's avatar
hxquangnhat committed
204
        for vol in volumes:
205 206 207 208 209 210 211 212 213 214
            if vol.type == "host_directory":
                assert isinstance(vol, VolumeDescriptionHostPath)
                aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
                aux['spec']['volumes'][count]['hostPath'] = {
                    'path': vol.path
                }
                count += 1
            else:
                log.error('Kubernetes backend does not support volume type {}'.format(vol.type))
                continue
215

hxquangnhat's avatar
hxquangnhat committed
216 217
    def get_json(self):
        """Get json file"""
218 219
        return self.conf

220

221 222 223
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
224
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
225

226
    def spawn_replication_controller(self, service_instance: ServiceInstance):
227 228
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
229
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
230 231

        config.set_labels(ZOE_LABELS)
232
        config.set_labels({'service_name': service_instance.name})
233
        config.set_replicas(1)
234

hxquangnhat's avatar
hxquangnhat committed
235
        config.set_spec_selector(ZOE_LABELS)
236
        config.set_spec_selector({'service_name': service_instance.name})
237

hxquangnhat's avatar
hxquangnhat committed
238
        config.set_temp_meta_labels(ZOE_LABELS)
239
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
240

241 242
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
243

244 245
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
246

247 248
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
249

Daniele Venzano's avatar
Daniele Venzano committed
250 251
        if service_instance.memory_limit is not None:
            config.set_spec_container_mem_limit(service_instance.memory_limit.max)
252

Daniele Venzano's avatar
Daniele Venzano committed
253 254
        if service_instance.core_limit is not None:
            config.set_spec_container_core_limit(service_instance.core_limit.max)
255

256 257
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
258

259 260 261
        if service_instance.command is not None:
            config.set_spec_container_command(service_instance.command)

262 263 264
        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
265
            pykube.ReplicationController(self.api, config.get_json()).create()
266
            log.info('Created ReplicationController on Kubernetes cluster')
267
            info = self.inspect_replication_controller(service_instance.name)
268 269 270 271 272 273 274 275
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
276
            repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
277 278
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
279 280

            info = {
281 282
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
283 284 285
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
286

287 288 289 290
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
291

292 293 294
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
295
            if ready_replicas <= 0:
296 297
                info['state'] = 'undefined'
                info['running'] = False
298
            if 0 < ready_replicas <= no_replicas:
299 300 301 302 303 304
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

305
        except pykube.exceptions.ObjectDoesNotExist:
306 307 308 309 310 311 312 313 314
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
315
        repcon_list = pykube.ReplicationController.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=ZOE_LABELS).iterator()
316 317
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
318 319
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
320 321 322 323
        except Exception as ex:
            log.error(ex)
        return rclist

324
    def spawn_service(self, service_instance: ServiceInstance):
325 326 327
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

328
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
329
        config.set_labels(ZOE_LABELS)
330
        config.set_labels({'service_name': service_instance.name})
331

332 333
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
334

hxquangnhat's avatar
hxquangnhat committed
335
        config.set_selectors(ZOE_LABELS)
336
        config.set_selectors({'service_name': service_instance.name})
337 338

        try:
hxquangnhat's avatar
hxquangnhat committed
339
            pykube.Service(self.api, config.get_json()).create()
340 341 342 343 344 345 346 347
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
348
            service_list = pykube.Service.objects(self.api).filter(namespace=get_conf().kube_namespace)
hxquangnhat's avatar
hxquangnhat committed
349
            service = service_list.get_by_name(name)
350 351 352 353
            srv_info = service.obj

            info = {
                'service_name': name,
354
                'port_forwarding': []  # type: List[Dict]
355 356 357 358 359
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

360
            length = len(srv_info['spec']['ports'])
361

362
            info['port_forwarding'] = [{} for _ in range(length)]
363

364
            for i in range(length):  # type: int
365 366 367 368
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
369
            info = None
370 371 372 373 374 375

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
376 377 378 379
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
380 381
                'name': name,
                'namespace': get_conf().kube_namespace
382 383
            }
        }
384
        try:
hxquangnhat's avatar
hxquangnhat committed
385 386
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
387

hxquangnhat's avatar
hxquangnhat committed
388 389
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
390

hxquangnhat's avatar
hxquangnhat committed
391 392
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
393
            pod_selector['service_name'] = name
394
            pods = pykube.Pod.objects(self.api).filter(namespace=get_conf().kube_namespace, selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
395 396 397
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
398 399 400 401 402

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

403
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
404 405
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
406

407
        node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
hxquangnhat's avatar
hxquangnhat committed
408
        node_dict = {}
409

410
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
411 412 413 414 415 416 417
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

418
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
419 420 421 422
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
423
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
424 425 426 427 428 429 430 431
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
432
                        # ex: cpu could be 100m or 0.1
433 434 435 436 437
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
438 439 440 441
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
442 443

        return pl_status