api_client.py 15.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
import json
import time
20 21 22
from argparse import Namespace

from typing import Dict, Any, List
23
import humanfriendly
24
import pykube
25 26

from zoe_master.stats import ClusterStats, NodeStats
27
from zoe_master.backends.service_instance import ServiceInstance
28
from zoe_lib.version import ZOE_VERSION
29
from zoe_lib.state import VolumeDescription
30 31 32

log = logging.getLogger(__name__)

33 34 35 36
ZOE_LABELS = {
    "app": "zoe",
    "version": ZOE_VERSION
}
37 38 39


class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
40
    """Kubeconfig class"""
41 42
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
43 44
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
45

46

47 48 49
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
50 51 52 53 54 55 56 57 58 59 60 61
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
                'labels': {}
            },
            'spec': {
                'selector': {},
                'type': 'LoadBalancer',
                'ports': []
            },
        }
62

hxquangnhat's avatar
hxquangnhat committed
63 64
    def set_name(self, name):
        """Setter to set name"""
65 66
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
67 68 69 70
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
71

hxquangnhat's avatar
hxquangnhat committed
72 73
    def set_ports(self, ports):
        """Setter to set ports"""
74
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
75
        count = 0  # type: int
76

hxquangnhat's avatar
hxquangnhat committed
77
        for prt in ports:
78 79 80 81 82
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
            aux[count]['name'] = 'http'
            aux[count]['port'] = prt
            aux[count]['targetPort'] = prt
            count += 1
83

hxquangnhat's avatar
hxquangnhat committed
84 85 86 87
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
88

hxquangnhat's avatar
hxquangnhat committed
89 90
    def get_json(self):
        """get Json files"""
91 92
        return self.conf

93

94 95 96
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
                'labels': {}
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
116

hxquangnhat's avatar
hxquangnhat committed
117 118
    def set_name(self, name):
        """Setter to set name"""
119 120
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
121 122 123 124
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
125

hxquangnhat's avatar
hxquangnhat committed
126 127
    def set_replicas(self, reps):
        """Setter to set replicas"""
128 129
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
130 131 132 133
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
134

hxquangnhat's avatar
hxquangnhat committed
135 136 137
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
138 139
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
140

hxquangnhat's avatar
hxquangnhat committed
141 142
    def set_spec_container_image(self, image):
        """Setter to set container image"""
143 144
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
145

hxquangnhat's avatar
hxquangnhat committed
146 147
    def set_spec_container_name(self, name):
        """Setter to set container name"""
148 149
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
150

hxquangnhat's avatar
hxquangnhat committed
151 152
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
153 154
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
155 156 157
        count = 0

        for k in env:
158 159 160
            aux['spec']['containers'][0]['env'][count]['name'] = k
            aux['spec']['containers'][0]['env'][count]['value'] = env[k]
            count += 1
161

hxquangnhat's avatar
hxquangnhat committed
162 163
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
164 165
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
166 167
        count = 0

hxquangnhat's avatar
hxquangnhat committed
168
        for prt in ports:
169 170
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt
            count += 1
171

hxquangnhat's avatar
hxquangnhat committed
172 173
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
174
        memset = str(memlimit / (1024*1024)) + "Mi"
175 176 177 178
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
179

hxquangnhat's avatar
hxquangnhat committed
180 181
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
182 183
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
184

185
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
186
        """Setter to set container volumes"""
187 188
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
189 190
        count = 0

hxquangnhat's avatar
hxquangnhat committed
191
        for vol in volumes:
192 193
            if vol.type != "host_directory":
                log.error('Swarm backend does not support volume type {}'.format(vol.type))
194

195 196 197 198 199
            aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
            aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
            count += 1

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
200 201
        count = 0

hxquangnhat's avatar
hxquangnhat committed
202
        for vol in volumes:
203 204 205 206 207
            aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
            aux['spec']['volumes'][count]['hostPath'] = {
                'path': vol.path
            }
            count += 1
208

hxquangnhat's avatar
hxquangnhat committed
209 210
    def get_json(self):
        """Get json file"""
211 212
        return self.conf

213

214 215 216
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
217 218 219 220
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
221

222
    def spawn_replication_controller(self, service_instance: ServiceInstance):
223 224
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
225
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
226 227

        config.set_labels(ZOE_LABELS)
228 229
        config.set_labels({'service_name': service_instance.name})
        config.set_replicas(service_instance.replicas_count)
230

hxquangnhat's avatar
hxquangnhat committed
231
        config.set_spec_selector(ZOE_LABELS)
232
        config.set_spec_selector({'service_name': service_instance.name})
233

hxquangnhat's avatar
hxquangnhat committed
234
        config.set_temp_meta_labels(ZOE_LABELS)
235
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
236

237 238
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
239

240 241
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
242

243 244
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
245

246
        config.set_spec_container_mem_limit(service_instance.memory_limit)
247

248 249
        if service_instance.core_limit != 0:
            config.set_spec_container_core_limit(service_instance.core_limit)
250

251 252
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
253 254 255 256

        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
257
            pykube.ReplicationController(self.api, config.get_json()).create()
258
            log.info('Created ReplicationController on Kubernetes cluster')
259
            info = self.inspect_replication_controller(service_instance.name)
260 261 262 263 264 265 266 267
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
hxquangnhat's avatar
hxquangnhat committed
268 269 270
            repcon_list = pykube.ReplicationController.objects(self.api)
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
271 272

            info = {
273 274
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
275 276 277
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
278

279 280 281 282
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
283

284 285 286
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
287
            if ready_replicas <= 0:
288 289
                info['state'] = 'undefined'
                info['running'] = False
290
            if 0 < ready_replicas <= no_replicas:
291 292 293 294 295 296
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

297
        except pykube.exceptions.ObjectDoesNotExist:
298 299 300 301 302 303 304 305 306
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
hxquangnhat's avatar
hxquangnhat committed
307
        repcon_list = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).iterator()
308 309
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
310 311
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
312 313 314 315 316 317
        except Exception as ex:
            log.error(ex)
        return rclist

    def replication_controller_event(self):
        """Get event stream of the replication controller."""
hxquangnhat's avatar
hxquangnhat committed
318 319
        rc_stream = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).watch()
        return rc_stream
320

321
    def spawn_service(self, service_instance: ServiceInstance):
322 323 324
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

325
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
326
        config.set_labels(ZOE_LABELS)
327
        config.set_labels({'service_name': service_instance.name})
328

329 330
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
331

hxquangnhat's avatar
hxquangnhat committed
332
        config.set_selectors(ZOE_LABELS)
333
        config.set_selectors({'service_name': service_instance.name})
334 335

        try:
hxquangnhat's avatar
hxquangnhat committed
336
            pykube.Service(self.api, config.get_json()).create()
337 338 339 340 341 342 343 344
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
hxquangnhat's avatar
hxquangnhat committed
345 346
            service_list = pykube.Service.objects(self.api)
            service = service_list.get_by_name(name)
347 348 349 350
            srv_info = service.obj

            info = {
                'service_name': name,
351
                'port_forwarding': []  # type: List[Dict]
352 353 354 355 356
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

357
            length = len(srv_info['spec']['ports'])
358

359
            info['port_forwarding'] = [{} for _ in range(length)]
360

361
            for i in range(length):  # type: int
362 363 364 365
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
366
            info = None
367 368 369 370 371 372

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
373 374 375 376 377 378 379
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
                'name': name
            }
        }
380
        try:
hxquangnhat's avatar
hxquangnhat committed
381 382
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
383

hxquangnhat's avatar
hxquangnhat committed
384 385
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
386

hxquangnhat's avatar
hxquangnhat committed
387 388
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
389 390
            pod_selector['service_name'] = name
            pods = pykube.Pod.objects(self.api).filter(namespace="default", selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
391 392 393
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
394 395 396 397 398

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

399
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
400 401
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
402 403 404

        node_list = pykube.Node.objects(self.api).iterator()
        node_dict = {}
405

406
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
407 408 409 410 411 412 413
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

414
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
415 416 417 418
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
419
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
420 421 422 423 424 425 426 427
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
428
                        # ex: cpu could be 100m or 0.1
429 430 431 432 433
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
449 450 451
        pl_status.timestamp = time.time()

        return pl_status