interface.py 6.64 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The high-level interface that Zoe uses to talk to the configured container backend."""

import logging
19
from typing import List
20
21
22
23
24

from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service

from zoe_master.backends.base import BaseBackend
25
26
from zoe_master.backends.service_instance import ServiceInstance
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException, ZoeException
27
from zoe_master.stats import ClusterStats  # pylint: disable=unused-import
28
29
30
31
32
33
34
35
36
37

try:
    from zoe_master.backends.swarm.backend import SwarmBackend
except ImportError as ex:
    SwarmBackend = None

try:
    from zoe_master.backends.kubernetes.backend import KubernetesBackend
except ImportError:
    KubernetesBackend = None
38

39
40
41
42
43
try:
    from zoe_master.backends.docker.backend import DockerEngineBackend
except ImportError:
    DockerEngineBackend = None

44
45
46
47
log = logging.getLogger(__name__)


def _get_backend() -> BaseBackend:
Daniele Venzano's avatar
Daniele Venzano committed
48
    """Return the right backend instance by reading the global configuration."""
49
    backend_name = get_conf().backend
Daniele Venzano's avatar
Daniele Venzano committed
50
    if backend_name == 'Kubernetes':
51
52
        if KubernetesBackend is None:
            raise ZoeException('The Kubernetes backend requires the pykube module')
hxquangnhat's avatar
hxquangnhat committed
53
        return KubernetesBackend(get_conf())
54
55
56
57
    elif backend_name == 'Swarm':
        if SwarmBackend is None:
            raise ZoeException('The Swarm backend requires docker python version >= 2.0.2')
        return SwarmBackend(get_conf())
58
59
60
61
    elif backend_name == 'DockerEngine':
        if DockerEngineBackend is None:
            raise ZoeException('The Docker Engine backend requires docker python version >= 2.0.2')
        return DockerEngineBackend(get_conf())
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    else:
        log.error('Unknown backend selected')
        assert False


def initialize_backend(state):
    """Initializes the configured backend."""
    backend = _get_backend()
    backend.init(state)


def shutdown_backend():
    """Shuts down the configured backend."""
    backend = _get_backend()
    backend.shutdown()


79
def service_list_to_containers(execution: Execution, service_list: List[Service], placement=None) -> str:
80
    """Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
81
82
    backend = _get_backend()

83
    ordered_service_list = sorted(service_list, key=lambda x: x.startup_order)
84
85
86
87
88
89
90
91

    env_subst_dict = {
        'execution_id': execution.id,
        "execution_name": execution.name,
        'user_name': execution.user_id,
        'deployment_name': get_conf().deployment_name,
    }

92
    for service in execution.services:
93
94
95
96
        env_subst_dict['dns_name#' + service.name] = service.dns_name

    for service in ordered_service_list:
        env_subst_dict['dns_name#self'] = service.dns_name
97
98
        if placement is not None:
            service.assign_backend_host(placement[service.id])
99
        service.set_starting()
100
        instance = ServiceInstance(execution, service, env_subst_dict)
101
        try:
102
            backend_id, ip_address = backend.spawn_service(instance)
103
104
        except ZoeStartExecutionRetryException as ex:
            log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
105
            service.set_error(ex.message)
106
107
108
109
110
111
            execution.set_error_message(ex.message)
            terminate_execution(execution)
            execution.set_scheduled()
            return "requeue"
        except ZoeStartExecutionFatalException as ex:
            log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
Daniele Venzano's avatar
Daniele Venzano committed
112
            service.set_error(ex.message)
113
114
115
116
117
118
119
120
121
122
123
124
            execution.set_error_message(ex.message)
            terminate_execution(execution)
            execution.set_error()
            return "fatal"
        except Exception as ex:
            log.error('Fatal error trying to start service {} of execution {}'.format(service.id, execution.id))
            log.exception('BUG, this error should have been caught earlier')
            execution.set_error_message(str(ex))
            terminate_execution(execution)
            execution.set_error()
            return "fatal"
        else:
125
126
            log.debug('Service {} started'.format(instance.name))
            service.set_active(backend_id, ip_address)
hxquangnhat's avatar
hxquangnhat committed
127

qhoangxuan's avatar
qhoangxuan committed
128
    return "ok"
129
130
131
132
133
134
135
136
137
138
139
140


def start_all(execution: Execution) -> str:
    """Translate an execution object into containers.

    If an error occurs some containers may have been created and needs to be cleaned-up.
    """
    log.debug('starting all services for execution {}'.format(execution.id))
    execution.set_starting()
    return service_list_to_containers(execution, execution.services)


141
def start_essential(execution: Execution, placement) -> str:
142
143
144
145
    """Start the essential services for this execution"""
    log.debug('starting essential services for execution {}'.format(execution.id))
    execution.set_starting()

146
    return service_list_to_containers(execution, execution.essential_services, placement)
147
148


149
def start_elastic(execution: Execution, placement) -> str:
150
151
    """Start the runnable elastic services"""
    elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
152
    return service_list_to_containers(execution, elastic_to_start, placement)
153
154
155
156
157
158
159
160
161
162
163
164
165
166


def terminate_execution(execution: Execution) -> None:
    """Terminate an execution."""
    execution.set_cleaning_up()
    backend = _get_backend()
    for service in execution.services:
        assert isinstance(service, Service)
        if service.backend_id is not None:
            service.set_terminating()
            backend.terminate_service(service)
            service.set_inactive()
            log.debug('Service {} terminated'.format(service.name))
    execution.set_terminated()
167
168


169
def get_platform_state() -> ClusterStats:
170
171
172
    """Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling."""
    backend = _get_backend()
    return backend.platform_state()