interface.py 5.01 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (c) 2017, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The high-level interface that Zoe uses to talk to the configured container backend."""

import logging
19
from typing import List
20
21
22
23
24
25

from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service

from zoe_master.backends.base import BaseBackend
from zoe_master.backends.old_swarm.backend import OldSwarmBackend
26
from zoe_master.backends.old_swarm_new_api.backend import OldSwarmNewAPIBackend
27
from zoe_master.exceptions import ZoeStartExecutionFatalException, ZoeStartExecutionRetryException
28
29
30
31
32

log = logging.getLogger(__name__)


def _get_backend() -> BaseBackend:
Daniele Venzano's avatar
Daniele Venzano committed
33
    """Return the right backend instance by reading the global configuration."""
34
35
36
    backend_name = get_conf().backend
    if backend_name == 'OldSwarm':
        return OldSwarmBackend(get_conf())
37
38
    elif backend_name == 'OldSwarmNewAPI':
        return OldSwarmNewAPIBackend(get_conf())
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    else:
        log.error('Unknown backend selected')
        assert False


def initialize_backend(state):
    """Initializes the configured backend."""
    backend = _get_backend()
    backend.init(state)


def shutdown_backend():
    """Shuts down the configured backend."""
    backend = _get_backend()
    backend.shutdown()


56
57
def service_list_to_containers(execution: Execution, service_list: List[Service]) -> str:
    """Given a subset of services from an execution, tries to start them, return one of 'ok', 'requeue' for temporary failures and 'fatal' for fatal failures."""
58
59
    backend = _get_backend()

60
    ordered_service_list = sorted(service_list, key=lambda x: x.startup_order)
61
62
63

    for service in ordered_service_list:
        service.set_starting()
64
        try:
65
            backend.spawn_service(execution, service)
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
        except ZoeStartExecutionRetryException as ex:
            log.warning('Temporary failure starting service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
            execution.set_error_message(ex.message)
            terminate_execution(execution)
            execution.set_scheduled()
            return "requeue"
        except ZoeStartExecutionFatalException as ex:
            log.error('Fatal error trying to start service {} of execution {}: {}'.format(service.id, execution.id, ex.message))
            execution.set_error_message(ex.message)
            terminate_execution(execution)
            execution.set_error()
            return "fatal"
        except Exception as ex:
            log.error('Fatal error trying to start service {} of execution {}'.format(service.id, execution.id))
            log.exception('BUG, this error should have been caught earlier')
            execution.set_error_message(str(ex))
            terminate_execution(execution)
            execution.set_error()
            return "fatal"
        else:
            execution.set_running()
87
    return "ok"
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111


def start_all(execution: Execution) -> str:
    """Translate an execution object into containers.

    If an error occurs some containers may have been created and needs to be cleaned-up.
    """
    log.debug('starting all services for execution {}'.format(execution.id))
    execution.set_starting()
    return service_list_to_containers(execution, execution.services)


def start_essential(execution) -> str:
    """Start the essential services for this execution"""
    log.debug('starting essential services for execution {}'.format(execution.id))
    execution.set_starting()

    return service_list_to_containers(execution, execution.essential_services)


def start_elastic(execution) -> str:
    """Start the runnable elastic services"""
    elastic_to_start = [s for s in execution.elastic_services if s.status == Service.RUNNABLE_STATUS]
    return service_list_to_containers(execution, elastic_to_start)
112
113
114
115
116
117
118
119
120
121
122
123
124
125


def terminate_execution(execution: Execution) -> None:
    """Terminate an execution."""
    execution.set_cleaning_up()
    backend = _get_backend()
    for service in execution.services:
        assert isinstance(service, Service)
        if service.backend_id is not None:
            service.set_terminating()
            backend.terminate_service(service)
            service.set_inactive()
            log.debug('Service {} terminated'.format(service.name))
    execution.set_terminated()
126
127
128
129
130
131


def get_platform_state():
    """Retrieves the state of the platform by querying the container backend. Platform state includes information on free/reserved resources for each node. This information is used for advanced scheduling."""
    backend = _get_backend()
    return backend.platform_state()