api_client.py 18.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interface to the low-level Kubernetes API."""

try:
    import pykube
except ImportError:
    pykube = None

hxquangnhat's avatar
hxquangnhat committed
23 24 25
from argparse import Namespace
from typing import Iterable, Dict, Any, Union

26 27 28 29 30 31 32 33 34 35
import logging
import json
import time
import humanfriendly

from zoe_master.stats import ClusterStats, NodeStats
from zoe_lib.version import ZOE_VERSION

log = logging.getLogger(__name__)

hxquangnhat's avatar
hxquangnhat committed
36
ZOE_LABELS = {"app": "zoe", "version": ZOE_VERSION}
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114

class DockerContainerOptions:
    """Wrapper for the Docker container options."""
    def __init__(self):
        self.env = {}
        self.volume_binds = []
        self.volumes = []
        self.command = ""
        self.memory_limit = 2 * (1024**3)
        self.cores_limit = 0
        self.name = ''
        self.ports = []
        self.network_name = 'bridge'
        self.restart = True
        self.labels = []
        self.gelf_log_address = ''
        self.constraints = []
        self.replicas = 1

    def add_constraint(self, constraint):
        """Add a placement constraint (use docker syntax)."""
        self.constraints.append(constraint)

    def add_env_variable(self, name: str, value: Union[str, None]) -> None:
        """Add an environment variable to the container definition."""
        self.env[name] = value

    @property
    def environment(self) -> Dict[str, Union[str, None]]:
        """Access the environment variables."""
        return self.env

    def add_volume_bind(self, path: str, mountpoint: str, readonly=False) -> None:
        """Add a volume to the container."""
        self.volumes.append(mountpoint)
        self.volume_binds.append(path + ":" + mountpoint + ":" + ("ro" if readonly else "rw"))

    def get_volumes(self) -> Iterable[str]:
        """Get the volumes in Docker format."""
        return self.volumes

    def get_volume_binds(self) -> Iterable[str]:
        """Get the volumes in another Docker format."""
        return self.volume_binds

    def set_command(self, cmd):
        """Setter for the command to run in the container."""
        self.command = cmd

    def get_command(self) -> str:
        """Getter for the command to run in the container."""
        return self.command

    def set_memory_limit(self, limit: int):
        """Setter for the memory limit of the container."""
        self.memory_limit = limit

    def get_memory_limit(self) -> int:
        """Getter for the memory limit of the container."""
        return self.memory_limit

    def set_cores_limit(self, limit: float):
        """Setter for the cores limit of the container."""
        self.cores_limit = limit

    def get_cores_limit(self):
        """Getter for the cores limit of the container."""
        return self.cores_limit

    @property
    def restart_policy(self) -> Dict[str, str]:
        """Getter for the restart policy of the container."""
        if self.restart:
            return {'Name': 'always'}
        else:
            return {}

    def set_replicas(self, reps: int):
hxquangnhat's avatar
hxquangnhat committed
115
        """Setter to set replicas"""
116 117 118
        self.replicas = reps

    def get_replicas(self) -> int:
hxquangnhat's avatar
hxquangnhat committed
119
        """Getter to get replicas"""
120 121 122
        return self.replicas

class KubernetesConf:
hxquangnhat's avatar
hxquangnhat committed
123
    """Kubeconfig class"""
124 125
    def __init__(self, jsonfile):
        self.config = {}
hxquangnhat's avatar
hxquangnhat committed
126 127
        with open(jsonfile, 'r') as inp:
            self.config = json.load(inp)
128 129 130 131 132 133 134 135 136 137 138 139 140

class KubernetesServiceConf:
    """ Wrapper for Kubernetes Service configuration """
    def __init__(self):
        self.conf = {}
        self.conf['kind'] = 'Service'
        self.conf['apiVersion'] = "v1"
        self.conf['metadata'] = {}
        self.conf['metadata']['labels'] = {}
        self.conf['spec'] = {}
        self.conf['spec']['selector'] = {}
        self.conf['spec']['type'] = 'LoadBalancer'

hxquangnhat's avatar
hxquangnhat committed
141 142
    def set_name(self, name):
        """Setter to set name"""
143 144
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
145 146 147 148
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
149

hxquangnhat's avatar
hxquangnhat committed
150 151
    def set_ports(self, ports):
        """Setter to set ports"""
152 153 154
        self.conf['spec']['ports'] = [{} for _ in range(len(ports))]
        count = 0

hxquangnhat's avatar
hxquangnhat committed
155
        for prt in ports:
156
            self.conf['spec']['ports'][count]['name'] = 'http'
hxquangnhat's avatar
hxquangnhat committed
157 158
            self.conf['spec']['ports'][count]['port'] = prt
            self.conf['spec']['ports'][count]['targetPort'] = prt
159 160
            count = count + 1

hxquangnhat's avatar
hxquangnhat committed
161 162 163 164
    def set_selectors(self, selectors: dict):
        """Setter to set selectors"""
        for key in selectors:
            self.conf['spec']['selector'][key] = selectors[key]
165

hxquangnhat's avatar
hxquangnhat committed
166 167
    def get_json(self):
        """get Json files"""
168 169 170 171 172 173 174 175 176 177
        return self.conf

class KubernetesReplicationControllerConf:
    """ Wrapper for Kubernetes ReplicationController Configuration """
    def __init__(self):
        self.conf = {}
        self.conf['kind'] = 'ReplicationController'
        self.conf['apiVersion'] = "v1"
        self.conf['metadata'] = {}
        self.conf['metadata']['labels'] = {}
hxquangnhat's avatar
hxquangnhat committed
178

179
        self.conf['spec'] = {}
hxquangnhat's avatar
hxquangnhat committed
180

181
        self.conf['spec']['replicas'] = 1
hxquangnhat's avatar
hxquangnhat committed
182

183 184 185 186 187 188 189 190 191
        self.conf['spec']['selector'] = {}

        self.conf['spec']['template'] = {}
        self.conf['spec']['template']['metadata'] = {}
        self.conf['spec']['template']['metadata']['labels'] = {}

        self.conf['spec']['template']['spec'] = {}
        self.conf['spec']['template']['spec']['containers'] = [{}]

hxquangnhat's avatar
hxquangnhat committed
192 193
    def set_name(self, name):
        """Setter to set name"""
194 195
        self.conf['metadata']['name'] = name

hxquangnhat's avatar
hxquangnhat committed
196 197 198 199
    def set_labels(self, lbs: dict):
        """Setter to set label"""
        for key in lbs:
            self.conf['metadata']['labels'][key] = lbs[key]
200

hxquangnhat's avatar
hxquangnhat committed
201 202
    def set_replicas(self, reps):
        """Setter to set replicas"""
203 204
        self.conf['spec']['replicas'] = reps

hxquangnhat's avatar
hxquangnhat committed
205 206 207 208
    def set_spec_selector(self, lbs: dict):
        """Setter to set specselector"""
        for key in lbs:
            self.conf['spec']['selector'][key] = lbs[key]
209

hxquangnhat's avatar
hxquangnhat committed
210 211 212 213
    def set_temp_meta_labels(self, lbs: dict):
        """Setter to set spectemplatemetadatalabel"""
        for key in lbs:
            self.conf['spec']['template']['metadata']['labels'][key] = lbs[key]
214

hxquangnhat's avatar
hxquangnhat committed
215 216
    def set_spec_container_image(self, image):
        """Setter to set container image"""
217 218
        self.conf['spec']['template']['spec']['containers'][0]['image'] = image

hxquangnhat's avatar
hxquangnhat committed
219 220
    def set_spec_container_name(self, name):
        """Setter to set container name"""
221 222
        self.conf['spec']['template']['spec']['containers'][0]['name'] = name

hxquangnhat's avatar
hxquangnhat committed
223 224
    def set_spec_container_env(self, env: dict):
        """Setter to set container environment"""
225 226 227 228 229 230 231 232
        self.conf['spec']['template']['spec']['containers'][0]['env'] = [{} for _ in range(len(env))]
        count = 0

        for k in env:
            self.conf['spec']['template']['spec']['containers'][0]['env'][count]['name'] = k
            self.conf['spec']['template']['spec']['containers'][0]['env'][count]['value'] = env[k]
            count = count + 1

hxquangnhat's avatar
hxquangnhat committed
233 234
    def set_spec_container_ports(self, ports):
        """Setter to set container ports"""
235 236 237
        self.conf['spec']['template']['spec']['containers'][0]['ports'] = [{} for _ in range(len(ports))]
        count = 0

hxquangnhat's avatar
hxquangnhat committed
238 239
        for prt in ports:
            self.conf['spec']['template']['spec']['containers'][0]['ports'][count]['containerPort'] = prt
240 241
            count = count + 1

hxquangnhat's avatar
hxquangnhat committed
242 243
    def set_spec_container_mem_limit(self, memlimit):
        """Setter to set container mem limit"""
244 245 246 247 248
        memset = str(memlimit / (1024*1024)) + "Mi"
        self.conf['spec']['template']['spec']['containers'][0]['resources'] = {}
        self.conf['spec']['template']['spec']['containers'][0]['resources']['limits'] = {}
        self.conf['spec']['template']['spec']['containers'][0]['resources']['limits']['memory'] = memset

hxquangnhat's avatar
hxquangnhat committed
249 250
    def set_spec_container_core_limit(self, corelimit):
        """Setter to set container corelimit"""
251 252
        self.conf['spec']['template']['spec']['containers'][0]['resources']['limits']['cpu'] = corelimit

hxquangnhat's avatar
hxquangnhat committed
253 254
    def set_spec_container_volumes(self, volumes, name):
        """Setter to set container volumes"""
255 256 257
        self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'] = [{} for _ in range(len(volumes))]
        count = 0

hxquangnhat's avatar
hxquangnhat committed
258 259
        for vol in volumes:
            vsplit = vol.split(':')
260 261 262 263 264 265 266
            self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'][count]['mountPath'] = vsplit[0]
            self.conf['spec']['template']['spec']['containers'][0]['volumeMounts'][count]['name'] = name + "-" + str(count)
            count = count + 1

        self.conf['spec']['template']['spec']['volumes'] = [{} for _ in range(len(volumes))]
        count = 0

hxquangnhat's avatar
hxquangnhat committed
267 268
        for vol in volumes:
            vsplit = vol.split(':')
269 270 271 272 273
            self.conf['spec']['template']['spec']['volumes'][count]['name'] = name + "-" + str(count)
            self.conf['spec']['template']['spec']['volumes'][count]['hostPath'] = {}
            self.conf['spec']['template']['spec']['volumes'][count]['hostPath']['path'] = vsplit[1]
            count = count + 1

hxquangnhat's avatar
hxquangnhat committed
274 275
    def get_json(self):
        """Get json file"""
276 277 278 279 280
        return self.conf

class KubernetesClient:
    """The Kubernetes client class that wraps the Kubernetes API."""
    def __init__(self, opts: Namespace) -> None:
hxquangnhat's avatar
hxquangnhat committed
281 282 283 284
        #try:
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(opts.kube_config_file))
        #except Exception as e:
        #    log.error(e)
285 286 287 288

    def spawn_replication_controller(self, image: str, options: DockerContainerOptions):
        """Create and start a new replication controller."""
        config = KubernetesReplicationControllerConf()
hxquangnhat's avatar
hxquangnhat committed
289 290 291 292 293
        config.set_name(options.name)

        config.set_labels(ZOE_LABELS)
        config.set_labels({'service_name' : options.name})
        config.set_replicas(options.get_replicas())
294

hxquangnhat's avatar
hxquangnhat committed
295 296
        config.set_spec_selector(ZOE_LABELS)
        config.set_spec_selector({'service_name' : options.name})
297

hxquangnhat's avatar
hxquangnhat committed
298 299 300 301 302
        config.set_temp_meta_labels(ZOE_LABELS)
        config.set_temp_meta_labels({'service_name': options.name})

        config.set_spec_container_image(image)
        config.set_spec_container_name(options.name)
303 304

        if len(options.environment) > 0:
hxquangnhat's avatar
hxquangnhat committed
305
            config.set_spec_container_env(options.environment)
306 307

        if len(options.ports) > 0:
hxquangnhat's avatar
hxquangnhat committed
308 309 310
            config.set_spec_container_ports(options.ports)

        config.set_spec_container_mem_limit(options.get_memory_limit())
311 312

        if options.get_cores_limit() != 0:
hxquangnhat's avatar
hxquangnhat committed
313
            config.set_spec_container_core_limit(options.get_cores_limit())
314 315

        if len(list(options.get_volume_binds())) > 0:
hxquangnhat's avatar
hxquangnhat committed
316
            config.set_spec_container_volumes(list(options.get_volume_binds()), options.name)
317 318 319 320

        info = {}

        try:
hxquangnhat's avatar
hxquangnhat committed
321
            pykube.ReplicationController(self.api, config.get_json()).create()
322 323 324 325 326 327 328 329 330 331
            log.info('Created ReplicationController on Kubernetes cluster')
            info = self.inspect_replication_controller(options.name)
        except Exception as ex:
            log.error(ex)

        return info

    def inspect_replication_controller(self, name):
        """Get information about a specific replication controller."""
        try:
hxquangnhat's avatar
hxquangnhat committed
332 333 334
            repcon_list = pykube.ReplicationController.objects(self.api)
            rep = repcon_list.get_by_name(name)
            rc_info = rep.obj
335 336 337 338 339 340 341 342

            info = {
                "backend_id": rc_info['metadata']['uid']
            }

            info['ip_address'] = '0.0.0.0'

            no_replicas = rc_info['spec']['replicas']
hxquangnhat's avatar
hxquangnhat committed
343

344 345 346 347
            if 'readyReplicas' in rc_info['status']:
                ready_replicas = rc_info['status']['readyReplicas']
            else:
                ready_replicas = 0
hxquangnhat's avatar
hxquangnhat committed
348

349 350 351
            info['replicas'] = no_replicas
            info['readyReplicas'] = ready_replicas

hxquangnhat's avatar
hxquangnhat committed
352
            if ready_replicas <= 0:
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
                info['state'] = 'undefined'
                info['running'] = False
            if ready_replicas > 0 and ready_replicas <= no_replicas:
                info['state'] = 'running'
                info['running'] = True
            else:
                info['state'] = 'undefined'
                info['running'] = True

        except pykube.exceptions.ObjectDoesNotExist as ex:
            return None
        except Exception as ex:
            log.error(ex)
            return None

        return info

    def replication_controller_list(self):
        """Get list of replication controller."""
hxquangnhat's avatar
hxquangnhat committed
372
        repcon_list = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).iterator()
373 374
        rclist = []
        try:
hxquangnhat's avatar
hxquangnhat committed
375 376
            for rep in repcon_list:
                rclist.append(self.inspect_replication_controller(rep.name))
377 378 379 380 381 382
        except Exception as ex:
            log.error(ex)
        return rclist

    def replication_controller_event(self):
        """Get event stream of the replication controller."""
hxquangnhat's avatar
hxquangnhat committed
383 384
        rc_stream = pykube.ReplicationController.objects(self.api).filter(selector=ZOE_LABELS).watch()
        return rc_stream
385

hxquangnhat's avatar
hxquangnhat committed
386
    def spawn_service(self, options: DockerContainerOptions):
387 388 389
        """Create and start a new Service object."""
        config = KubernetesServiceConf()

hxquangnhat's avatar
hxquangnhat committed
390 391 392
        config.set_name(options.name)
        config.set_labels(ZOE_LABELS)
        config.set_labels({'service_name' : options.name})
393 394

        if len(options.ports) > 0:
hxquangnhat's avatar
hxquangnhat committed
395
            config.set_ports(options.ports)
396

hxquangnhat's avatar
hxquangnhat committed
397 398
        config.set_selectors(ZOE_LABELS)
        config.set_selectors({'service_name' : options.name})
399 400

        try:
hxquangnhat's avatar
hxquangnhat committed
401
            pykube.Service(self.api, config.get_json()).create()
402 403 404 405 406 407 408 409
            log.info('created service on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)
        return

    def inspect_service(self, name) -> Dict[str, Any]:
        """Get information of a specific service."""
        try:
hxquangnhat's avatar
hxquangnhat committed
410 411
            service_list = pykube.Service.objects(self.api)
            service = service_list.get_by_name(name)
412 413 414 415 416 417 418 419 420 421
            srv_info = service.obj

            info = {
                'service_name': name,
                'port_forwarding': []
            }

            if 'clusterIP' in srv_info['spec']:
                info['clusterIP'] = srv_info['spec']['clusterIP']

hxquangnhat's avatar
hxquangnhat committed
422
            lgth = len(srv_info['spec']['ports'])
423

hxquangnhat's avatar
hxquangnhat committed
424
            info['port_forwarding'] = [{} for _ in range(lgth)]
425

hxquangnhat's avatar
hxquangnhat committed
426
            for i in range(lgth):
427 428 429 430 431 432 433 434 435 436
                info['port_forwarding'][i]['port'] = srv_info['spec']['ports'][i]['port']
                info['port_forwarding'][i]['nodePort'] = srv_info['spec']['ports'][i]['nodePort']
        except Exception as ex:
            log.error(ex)

        return info

    def terminate(self, name):
        """Terminate a service.
        It will terminate Service, then ReplicationController and Pods have the same labels."""
hxquangnhat's avatar
hxquangnhat committed
437
        del_obj = {'apiVersion': 'v1', 'kind': '', 'metadata' : {'name' : name}}
438
        try:
hxquangnhat's avatar
hxquangnhat committed
439 440
            del_obj['kind'] = 'Service'
            pykube.Service(self.api, del_obj).delete()
441

hxquangnhat's avatar
hxquangnhat committed
442 443
            del_obj['kind'] = 'ReplicationController'
            pykube.ReplicationController(self.api, del_obj).delete()
444

hxquangnhat's avatar
hxquangnhat committed
445 446
            del_obj['kind'] = 'Pod'
            pod_selector = ZOE_LABELS
447 448
            pod_selector['service_name'] = name
            pods = pykube.Pod.objects(self.api).filter(namespace="default", selector=pod_selector).iterator()
hxquangnhat's avatar
hxquangnhat committed
449 450 451
            for pod in pods:
                del_obj['metadata']['name'] = str(pod)
                pykube.Pod(self.api, del_obj).delete()
452 453 454 455 456

            log.info('Service deleted on Kubernetes cluster')
        except Exception as ex:
            log.error(ex)

hxquangnhat's avatar
hxquangnhat committed
457
    def info(self) -> ClusterStats: #pylint: disable=too-many-locals
458 459
        """Retrieve Kubernetes cluster statistics."""
        pl_status = ClusterStats()
hxquangnhat's avatar
hxquangnhat committed
460 461 462

        node_list = pykube.Node.objects(self.api).iterator()
        node_dict = {}
463 464

        #Get basic information from nodes
hxquangnhat's avatar
hxquangnhat committed
465 466 467 468 469 470 471
        for node in node_list:
            nss = NodeStats(node.name)
            nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
            nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
            nss.labels = node.obj['metadata']['labels']
            node_dict[node.name] = nss

472
        #Get information from all running pods, then accummulate to nodes
hxquangnhat's avatar
hxquangnhat committed
473 474 475 476 477 478 479 480 481 482 483 484 485
        pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
        for pod in pod_list:
            host_ip = pod.obj['status']['hostIP']
            nss = node_dict[host_ip]
            nss.container_count = nss.container_count + 1
            spec_cont = pod.obj['spec']['containers'][0]
            if 'resources' in spec_cont:
                if 'requests' in spec_cont['resources']:
                    if 'memory' in spec_cont['resources']['requests']:
                        memory = spec_cont['resources']['requests']['memory']
                        nss.memory_reserved = nss.memory_reserved + humanfriendly.parse_size(memory)
                    if 'cpu' in spec_cont['resources']['requests']:
                        cpu = spec_cont['resources']['requests']['cpu']
486 487 488 489 490 491
                        #ex: cpu could be 100m or 0.1
                        cpu_splitted = cpu.split('m')
                        if len(cpu_splitted) > 1:
                            cpu_float = int(cpu_splitted[0]) / 1000
                        else:
                            cpu_float = int(cpu_splitted[0])
hxquangnhat's avatar
hxquangnhat committed
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
                        nss.cores_reserved = round(nss.cores_reserved + cpu_float, 3)

        cont_total = 0
        mem_total = 0
        cpu_total = 0

        for node_ip in node_dict:
            pl_status.nodes.append(node_dict[node_ip])
            cont_total = cont_total + node_dict[node_ip].container_count
            mem_total = mem_total + node_dict[node_ip].memory_total
            cpu_total = cpu_total + node_dict[node_ip].cores_total

        pl_status.container_count = cont_total
        pl_status.memory_total = mem_total
        pl_status.cores_total = cpu_total
507 508 509
        pl_status.timestamp = time.time()

        return pl_status