entrypoint.py 3.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#!/usr/bin/python3

# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

18 19
"""Zoe Master main entrypoint."""

20
import logging
21
import os
22

23
import zoe_lib.config
24
from zoe_lib.state import SQLManager
Daniele Venzano's avatar
Daniele Venzano committed
25
import zoe_master.backends.interface
26
import zoe_master.scheduler
27
from zoe_master.exceptions import ZoeException
28
from zoe_master.gelf_listener import GELFListener
29
from zoe_master.master_api import APIManager
30
from zoe_master.metrics.base import StatsManager
31
from zoe_master.preprocessing import restart_resubmit_scheduler
32 33

log = logging.getLogger("main")
Daniele Venzano's avatar
Daniele Venzano committed
34
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(threadName)s->%(name)s: %(message)s'
35 36


37
def _check_configuration_sanity():
38 39
    if not os.path.exists(os.path.join(zoe_lib.config.get_conf().workspace_base_path, zoe_lib.config.get_conf().workspace_deployment_path)):
        log.error('Workspace base directory does not exist: {}'.format(os.path.join(zoe_lib.config.get_conf().workspace_base_path, zoe_lib.config.get_conf().workspace_deployment_path)))
40 41 42 43
        return 1
    return 0


44
def main(test_conf=None):
45
    """
46
    The entrypoint for the zoe-master script.
47 48
    :return: int
    """
49 50
    zoe_lib.config.load_configuration(test_conf)
    args = zoe_lib.config.get_conf()
51

52 53 54 55 56 57
    log_args = {
        'level': logging.DEBUG if args.debug else logging.INFO,
        'format': LOG_FORMAT
    }
    if args.log_file != "stderr":
        log_args['filename'] = args.log_file
58
    logging.basicConfig(**log_args)
59
    logging.getLogger("kazoo").setLevel(logging.WARNING)
60

61 62 63 64
    ret = _check_configuration_sanity()
    if ret != 0:
        return ret

65 66
    log.info("Initializing DB manager")
    state = SQLManager(args)
67

68 69 70 71 72 73
    try:
        zoe_master.backends.interface.initialize_backend(state)
    except ZoeException as e:
        log.error('Cannot initialize backend: {}'.format(e.message))
        return 1

74 75 76
    metrics = StatsManager(state)
    metrics.start()

77
    log.info("Initializing scheduler")
78
    scheduler = getattr(zoe_master.scheduler, args.scheduler_class)(state, args.scheduler_policy, metrics)
79

80
    restart_resubmit_scheduler(state, scheduler)
81

82
    log.info("Starting ZMQ API server...")
83
    api_server = APIManager(metrics, scheduler, state)
84

85
    if zoe_lib.config.get_conf().gelf_listener != 0:
86 87 88 89
        gelf_listener = GELFListener()
    else:
        gelf_listener = None

90
    try:
91
        api_server.loop()
92 93 94
    except KeyboardInterrupt:
        pass
    except Exception:
95
        log.exception('Fatal error in API loop')
96
    finally:
97
        log.info('Terminating scheduler thread')
98
        scheduler.quit()
99
        log.info('Terminating api server thread')
100
        api_server.quit()
101
        log.info('Terminating back-end threads')
102
        zoe_master.backends.interface.shutdown_backend()
103
        log.info('Terminating metric thread')
104
        metrics.quit()
105
        if gelf_listener is not None:
106
            log.info('Terminating GELF listener thread')
107
            gelf_listener.quit()
108
    return 0