interface.py 8.41 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The high-level interface that Zoe uses to talk to the configured container backend."""

import logging
19
import time
20
from typing import List, Union
21
22

from zoe_lib.config import get_conf
Daniele Venzano's avatar
Daniele Venzano committed
23
from zoe_lib.state import Execution, Service  # pylint: disable=unused-import
24
25

from zoe_master.backends.base import BaseBackend
26
27
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
Daniele Venzano's avatar
Daniele Venzano committed
28
from zoe_master.stats import ClusterStats  # pylint: disable=unused-import
29
30
31
32
33

try:
    from zoe_master.backends.kubernetes.backend import KubernetesBackend
except ImportError:
    KubernetesBackend = None
34

35
36
37
38
39
try:
    from zoe_master.backends.docker.backend import DockerEngineBackend
except ImportError:
    DockerEngineBackend = None

40
41
42
log = logging.getLogger(__name__)


43
def _get_backend() -> Union[BaseBackend, None]:
Daniele Venzano's avatar
Daniele Venzano committed
44
    """Return the right backend instance by reading the global configuration."""
45
    backend_name = get_conf().backend
46
    assert backend_name in ['Kubernetes', 'Swarm', 'DockerEngine']
Daniele Venzano's avatar
Daniele Venzano committed
47
    if backend_name == 'Kubernetes':
48
49
        if KubernetesBackend is None:
            raise ZoeException('The Kubernetes backend requires the pykube module')
hxquangnhat's avatar
hxquangnhat committed
50
        return KubernetesBackend(get_conf())
51
52
53
54
    elif backend_name == 'DockerEngine':
        if DockerEngineBackend is None:
            raise ZoeException('The Docker Engine backend requires docker python version >= 2.0.2')
        return DockerEngineBackend(get_conf())
55
56
    else:
        log.error('Unknown backend selected')
57
        return None
58
59
60
61
62
63
64
65
66
67
68
69
70
71


def initialize_backend(state):
    """Initializes the configured backend."""
    backend = _get_backend()
    backend.init(state)


def shutdown_backend():
    """Shuts down the configured backend."""
    backend = _get_backend()
    backend.shutdown()


72
def service_list_to_containers(execution: Execution, service_list: List[Service], placement=None) -> str:
73
    """Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
74
75
    backend = _get_backend()

76
    ordered_service_list = sorted(service_list, key=lambda x: x.startup_order)
77
78
79

    env_subst_dict = {
        'execution_id': execution.id,
80
81
        'execution_name': execution.name,
        'user_name': execution.owner.username,
82
83
84
        'deployment_name': get_conf().deployment_name,
    }

85
    for service in execution.services:
86
87
88
89
        env_subst_dict['dns_name#' + service.name] = service.dns_name

    for service in ordered_service_list:
        env_subst_dict['dns_name#self'] = service.dns_name
90
91
        if placement is not None:
            service.assign_backend_host(placement[service.id])
92
        service.set_starting()
93
        instance = ServiceInstance(execution, service, env_subst_dict)
94
        try:
95
            backend_id, ip_address, ports = backend.spawn_service(instance)
96
97
        except ZoeStartExecutionRetryException as ex:
            log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
98
            service.set_error(ex.message)
99
            terminate_execution(execution, reason=ex.message)
100
            execution.set_queued()
101
102
103
            return "requeue"
        except ZoeStartExecutionFatalException as ex:
            log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
Daniele Venzano's avatar
Daniele Venzano committed
104
            service.set_error(ex.message)
105
            terminate_execution(execution, reason=ex.message)
106
107
108
109
110
            execution.set_error()
            return "fatal"
        except Exception as ex:
            log.error('Fatal error trying to start service {} of execution {}'.format(service.id, execution.id))
            log.exception('BUG, this error should have been caught earlier')
111
            terminate_execution(execution, reason=str(ex))
112
113
114
            execution.set_error()
            return "fatal"
        else:
115
            log.debug('Service {} started'.format(instance.name))
116
            service.set_active(backend_id, ip_address, ports)
hxquangnhat's avatar
hxquangnhat committed
117

qhoangxuan's avatar
qhoangxuan committed
118
    return "ok"
119
120
121
122
123
124
125
126
127
128
129
130


def start_all(execution: Execution) -> str:
    """Translate an execution object into containers.

    If an error occurs some containers may have been created and needs to be cleaned-up.
    """
    log.debug('starting all services for execution {}'.format(execution.id))
    execution.set_starting()
    return service_list_to_containers(execution, execution.services)


131
def start_essential(execution: Execution, placement) -> str:
132
133
134
135
    """Start the essential services for this execution"""
    log.debug('starting essential services for execution {}'.format(execution.id))
    execution.set_starting()

136
    return service_list_to_containers(execution, execution.essential_services, placement)
137
138


139
def start_elastic(execution: Execution, placement) -> str:
140
141
    """Start the runnable elastic services"""
    elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
142
    return service_list_to_containers(execution, elastic_to_start, placement)
143
144


145
146
def terminate_service(service: Service) -> None:
    """Terminate a single service."""
147
    backend = _get_backend()
148
149
    if service.status != Service.INACTIVE_STATUS:
        if service.status == Service.ERROR_STATUS:
Daniele Venzano's avatar
Daniele Venzano committed
150
151
            backend.terminate_service(service)
            log.debug('Service {} terminated'.format(service.name))
152
        elif service.status == Service.ACTIVE_STATUS or service.status == Service.TERMINATING_STATUS or service.status == Service.STARTING_STATUS:
153
154
155
156
            service.set_terminating()
            backend.terminate_service(service)
            service.set_inactive()
            log.debug('Service {} terminated'.format(service.name))
157
158
159
160
161
162
163
164
165
166
        elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
            service.set_inactive()
        else:
            log.error('BUG: don\'t know how to terminate a service in status {}'.format(service.status))
    elif not service.is_dead():
        log.warning('Service {} is inactive for Zoe, but running for the back-end, terminating and resetting state'.format(service.name))
        service.set_terminating()
        backend.terminate_service(service)
        service.set_inactive()
        log.debug('Service {} terminated'.format(service.name))
167
168


169
def terminate_execution(execution: Execution, reason: Union[None, str]=None) -> None:
170
171
172
    """Terminate an execution."""
    for service in execution.services:  # type: Service
        terminate_service(service)
173
    execution.set_terminated(reason)
174
175


176
177
def get_platform_state() -> ClusterStats:
    """Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node."""
178
    backend = _get_backend()
179
    return backend.platform_state()
180
181
182
183
184
185
186
187
188
189
190
191


def preload_image(image_name):
    """Make a service image available on the cluster, according to the backend support."""
    backend = _get_backend()
    log.debug('Preloading image {}'.format(image_name))
    time_start = time.time()
    try:
        backend.preload_image(image_name)
        log.info('Image {} preloaded in {:.2f}s'.format(image_name, time.time() - time_start))
    except NotImplementedError:
        log.warning('Backend {} does not support image preloading'.format(get_conf().backend))
192
193
194
195
196


def update_service_resource_limits(service, cores=None, memory=None):
    """Update a service reservation."""
    backend = _get_backend()
197
198
    if 'gpu' not in service.labels:  # see https://github.com/NVIDIA/nvidia-docker/issues/515
        backend.update_service(service, cores, memory)
199
200
201
202
203
204
205
206
207
208
209
210


def node_list():
    """List node names configured in the back-end."""
    backend = _get_backend()
    return backend.node_list()


def list_available_images(node_name):
    """List the images available on the specified node."""
    backend = _get_backend()
    return backend.list_available_images(node_name)