api_client.py 15.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""
import logging
import json
import time
20 21 22
from argparse import Namespace

from typing import Dict, Any, List
23
import humanfriendly
24
import pykube
25 26

from zoe_master.stats import ClusterStats, NodeStats
27
from zoe_master.backends.service_instance import ServiceInstance
28
from zoe_lib.version import ZOE_VERSION
29
from zoe_lib.state import VolumeDescription
30 31 32

log = logging.getLogger(__name__)

33 34 35 36
ZOE_LABELS = {
    "app": "zoe",
    "version": ZOE_VERSION
}
37 38 39


class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
40
    """Kubeconfig class"""
41 42
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
43 44
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
45

46

47 48 49
class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
50 51 52 53 54 55 56 57 58 59 60
        self.conf = {
            'kind': 'Service',
            'apiVersion': "v1",
            'metadata': {
                'labels': {}
            },
            'spec': {
                'selector': {},
                'ports': []
            },
        }
61

hxquangnhat's avatar
hxquangnhat committed
62 63
    def set_name(self, name):
        """Setter to set name"""
64 65
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
66 67 68 69
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
70

hxquangnhat's avatar
hxquangnhat committed
71 72
    def set_ports(self, ports):
        """Setter to set ports"""
73
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
74
        count = 0  # type: int
75

hxquangnhat's avatar
hxquangnhat committed
76
        for prt in ports:
77 78
            aux = self.conf['spec']['ports']  # type: List[Dict[str, str]]
            aux[count]['name'] = 'http'
qhoangxuan's avatar
qhoangxuan committed
79 80
            aux[count]['port'] = prt.number
            aux[count]['targetPort'] = prt.number
81
            count += 1
82

hxquangnhat's avatar
hxquangnhat committed
83 84 85 86
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
87

hxquangnhat's avatar
hxquangnhat committed
88 89
    def get_json(self):
        """get Json files"""
90 91
        return self.conf

92

93 94 95
class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
        self.conf = {
            'kind': 'ReplicationController',
            'apiVersion': "v1",
            'metadata': {
                'labels': {}
            },
            'spec': {
                'replicas': 1,
                'selector': {},
                'template': {
                    'metadata': {
                        'labels': {}
                    },
                    'spec': {
                        'containers': [{}]
                    }
                },
            }
        }
115

hxquangnhat's avatar
hxquangnhat committed
116 117
    def set_name(self, name):
        """Setter to set name"""
118 119
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
120 121 122 123
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
124

hxquangnhat's avatar
hxquangnhat committed
125 126
    def set_replicas(self, reps):
        """Setter to set replicas"""
127 128
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
129 130 131 132
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
133

hxquangnhat's avatar
hxquangnhat committed
134 135 136
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
137 138
            aux = self.conf['spec']['template']  # type: Dict
            aux['metadata']['labels'][key] = lbs[key]
139

hxquangnhat's avatar
hxquangnhat committed
140 141
    def set_spec_container_image(self, image):
        """Setter to set container image"""
142 143
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['image'] = image
144

hxquangnhat's avatar
hxquangnhat committed
145 146
    def set_spec_container_name(self, name):
        """Setter to set container name"""
147 148
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['name'] = name
149

hxquangnhat's avatar
hxquangnhat committed
150 151
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
152 153
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
154 155 156
        count = 0

        for k in env:
qhoangxuan's avatar
qhoangxuan committed
157 158
            aux['spec']['containers'][0]['env'][count]['name'] = k[0]
            aux['spec']['containers'][0]['env'][count]['value'] = k[1]
159
            count += 1
160

hxquangnhat's avatar
hxquangnhat committed
161 162
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
163 164
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
165 166
        count = 0

hxquangnhat's avatar
hxquangnhat committed
167
        for prt in ports:
qhoangxuan's avatar
qhoangxuan committed
168
            aux['spec']['containers'][0]['ports'][count]['containerPort'] = prt.number
169
            count += 1
170

hxquangnhat's avatar
hxquangnhat committed
171 172
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
173
        memset = str(memlimit / (1024*1024)) + "Mi"
174 175 176 177
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources'] = {}
        aux['spec']['containers'][0]['resources']['limits'] = {}
        aux['spec']['containers'][0]['resources']['limits']['memory'] = memset
178

hxquangnhat's avatar
hxquangnhat committed
179 180
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
181 182
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit
183

184
    def set_spec_container_volumes(self, volumes: List[VolumeDescription], name: str):
hxquangnhat's avatar
hxquangnhat committed
185
        """Setter to set container volumes"""
186 187
        aux = self.conf['spec']['template']  # type: Dict
        aux['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
188 189
        count = 0

hxquangnhat's avatar
hxquangnhat committed
190
        for vol in volumes:
191 192
            if vol.type != "host_directory":
                log.error('Swarm backend does not support volume type {}'.format(vol.type))
193

194 195 196 197 198
            aux['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vol.mount_point
            aux['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
            count += 1

        aux['spec']['volumes'] = [{} for _ in range(len(volumes))]
199 200
        count = 0

hxquangnhat's avatar
hxquangnhat committed
201
        for vol in volumes:
202 203 204 205 206
            aux['spec']['volumes'][count]['name'] = name + "-" + str(count)
            aux['spec']['volumes'][count]['hostPath'] = {
                'path': vol.path
            }
            count += 1
207

hxquangnhat's avatar
hxquangnhat committed
208 209
    def get_json(self):
        """Get json file"""
210 211
        return self.conf

212

213 214 215
class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
216 217 218 219
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
220

221
    def spawn_replication_controller(self, service_instance: ServiceInstance):
222 223
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
224
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
225 226

        config.set_labels(ZOE_LABELS)
227 228
        config.set_labels({'service_name': service_instance.name})
        config.set_replicas(service_instance.replicas_count)
229

hxquangnhat's avatar
hxquangnhat committed
230
        config.set_spec_selector(ZOE_LABELS)
231
        config.set_spec_selector({'service_name': service_instance.name})
232

hxquangnhat's avatar
hxquangnhat committed
233
        config.set_temp_meta_labels(ZOE_LABELS)
234
        config.set_temp_meta_labels({'service_name': service_instance.name})
hxquangnhat's avatar
hxquangnhat committed
235

236 237
        config.set_spec_container_image(service_instance.image_name)
        config.set_spec_container_name(service_instance.name)
238

239 240
        if len(service_instance.environment) > 0:
            config.set_spec_container_env(service_instance.environment)
241

242 243
        if len(service_instance.ports) > 0:
            config.set_spec_container_ports(service_instance.ports)
hxquangnhat's avatar
hxquangnhat committed
244

Daniele Venzano's avatar
Daniele Venzano committed
245 246
        if service_instance.memory_limit is not None:
            config.set_spec_container_mem_limit(service_instance.memory_limit.max)
247

Daniele Venzano's avatar
Daniele Venzano committed
248 249
        if service_instance.core_limit is not None:
            config.set_spec_container_core_limit(service_instance.core_limit.max)
250

251 252
        if len(service_instance.volumes) > 0:
            config.set_spec_container_volumes(service_instance.volumes, service_instance.name)
253 254 255 256

        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
257
            pykube.ReplicationController(self.api, config.get_json()).create()
258
            log.info('Created ReplicationController on Kubernetes cluster')
259
            info = self.inspect_replication_controller(service_instance.name)
260 261 262 263 264 265 266 267
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
hxquangnhat's avatar
hxquangnhat committed
268 269 270
            repcon_list = pykube.ReplicationController.objects(self.api)
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
271 272

            info = {
273 274
                "backend_id": rc_info['metadata']['uid'],
                'ip_address': '0.0.0.0'
275 276 277
            }

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
278

279 280 281 282
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
283

284 285 286
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
287
            if ready_replicas <= 0:
288 289
                info['state'] = 'undefined'
                info['running'] = False
290
            if 0 < ready_replicas <= no_replicas:
291 292 293 294 295 296
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

297
        except pykube.exceptions.ObjectDoesNotExist:
298 299 300 301 302 303 304 305 306
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
hxquangnhat's avatar
hxquangnhat committed
307
        repcon_list = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).iterator()
308 309
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
310 311
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
312 313 314 315 316 317
        except Exception as ex:
            log.error(ex)
        return rclist

    def replication_controller_event(self):
        """Get event stream of the replication controller."""
hxquangnhat's avatar
hxquangnhat committed
318 319
        rc_stream = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).watch()
        return rc_stream
320

321
    def spawn_service(self, service_instance: ServiceInstance):
322 323 324
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

325
        config.set_name(service_instance.name)
hxquangnhat's avatar
hxquangnhat committed
326
        config.set_labels(ZOE_LABELS)
327
        config.set_labels({'service_name': service_instance.name})
328

329 330
        if len(service_instance.ports) > 0:
            config.set_ports(service_instance.ports)
331

hxquangnhat's avatar
hxquangnhat committed
332
        config.set_selectors(ZOE_LABELS)
333
        config.set_selectors({'service_name': service_instance.name})
334 335

        try:
hxquangnhat's avatar
hxquangnhat committed
336
            pykube.Service(self.api, config.get_json()).create()
337 338 339 340 341 342 343 344
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
hxquangnhat's avatar
hxquangnhat committed
345 346
            service_list = pykube.Service.objects(self.api)
            service = service_list.get_by_name(name)
347 348 349 350
            srv_info = service.obj

            info = {
                'service_name': name,
351
                'port_forwarding': []  # type: List[Dict]
352 353 354 355 356
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

357
            length = len(srv_info['spec']['ports'])
358

359
            info['port_forwarding'] = [{} for _ in range(length)]
360

361
            for i in range(length):  # type: int
362 363 364 365
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)
366
            info = None
367 368 369 370 371 372

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
373 374 375 376 377 378 379
        del_obj = {
            'apiVersion': 'v1',
            'kind': '',
            'metadata': {
                'name': name
            }
        }
380
        try:
hxquangnhat's avatar
hxquangnhat committed
381 382
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
383

hxquangnhat's avatar
hxquangnhat committed
384 385
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
386

hxquangnhat's avatar
hxquangnhat committed
387 388
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
389 390
            pod_selector['service_name'] = name
            pods = pykube.Pod.objects(self.api).filter(namespace="default", selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
391 392 393
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
394 395 396 397 398

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

399
    def info(self) -> ClusterStats:  # pylint: disable=too-many-locals
400 401
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
402 403 404

        node_list = pykube.Node.objects(self.api).iterator()
        node_dict = {}
405

406
        # Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
407 408 409 410 411 412 413
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

414
        # Get information from all running pods, then accumulate to nodes
hxquangnhat's avatar
hxquangnhat committed
415 416 417 418
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
419
            nss.container_count += 1
hxquangnhat's avatar
hxquangnhat committed
420 421 422 423 424 425 426 427
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
428
                        # ex: cpu could be 100m or 0.1
429 430 431 432 433
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
449 450 451
        pl_status.timestamp = time.time()

        return pl_status