entrypoint.py 13 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
from datetime import datetime, timezone
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_lib.info import ZoeInfoAPI
Daniele Venzano's avatar
Daniele Venzano committed
33
from zoe_lib.services import ZoeServiceAPI
34
from zoe_lib.statistics import ZoeStatisticsAPI
35
from zoe_lib.exceptions import ZoeAPIException
36
from zoe_lib.executions import ZoeExecutionsAPI
37
from zoe_lib.version import ZOE_VERSION
38 39


40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
def _check_api_version(auth):
    """Checks if there is a version mismatch between server and client."""
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
    try:
        info_api.info()
        return True
    except ZoeAPIException:
        print('Error: this client can talk to ZOE v. {}, but server is reporting an error'.format(ZOE_VERSION,))
        print('Error: your client is too old (or too new) to speak with the configured server')
        print('Error: check the version this server is running at the bottom of this web page: {}'.format(auth['url']))
        return False


def _log_stream_stdout(service_id, timestamps, auth):
    service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
55 56 57 58 59 60 61 62 63 64 65 66
    try:
        for line in service_api.get_logs(service_id):
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


67
def info_cmd(auth, args_):
68
    """Queries the info endpoint."""
69
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
70 71 72 73 74 75 76
    info = info_api.info()
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


77
def app_get_cmd(auth, args):
78
    """Extract an application description from an execution."""
79
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
80
    execution = exec_api.get(args.id)
81
    if execution is None:
82
        print("no such execution")
83
    else:
84
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
85 86


87
def exec_list_cmd(auth, args):
88
    """List executions"""
89
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
Daniele Venzano's avatar
Daniele Venzano committed
90 91 92 93 94 95 96 97 98 99 100
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
101 102 103
    filters = {
        'user_id': auth['user']
    }
Daniele Venzano's avatar
Daniele Venzano committed
104 105 106
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
Daniele Venzano's avatar
Daniele Venzano committed
107
    data = exec_api.list(**filters)
108 109
    if len(data) == 0:
        return
110 111 112
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data.values(), key=lambda x: x['id'])]
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
113 114


115
def exec_start_cmd(auth, args):
116
    """Submit an execution."""
117
    app_descr = json.load(args.jsonfile)
118
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    exec_id = exec_api.start(args.name, app_descr)
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
            execution = exec_api.get(exec_id)
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
134
        monitor_service_id = None
135
        service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
136 137 138 139 140 141 142
        for service_id in execution['services']:
            service = service_api.get(service_id)
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
143
        why_stop = _log_stream_stdout(monitor_service_id, False, auth)
144 145 146 147 148 149 150
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
151 152


153
def exec_get_cmd(auth, args):
154
    """Gather information about an execution."""
155 156
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
    cont_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
157
    execution = exec_api.get(args.id)
158 159 160 161
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
162
        print('Application name: {}'.format(execution['description']['name']))
163
        print('Status: {}'.format(execution['status']))
164
        if execution['status'] == 'error':
165
            print('Last error: {}'.format(execution['error_message']))
166
        print()
167
        print('Time submit: {}'.format(datetime.fromtimestamp(execution['time_submit'], timezone.utc).astimezone()))
168 169 170 171

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
172
            print('Time start: {}'.format(datetime.fromtimestamp(execution['time_start'], timezone.utc).astimezone()))
173 174 175 176

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
177
            print('Time end: {}'.format(datetime.fromtimestamp(execution['time_end'], timezone.utc).astimezone()))
178
        print()
179

180
        endpoints = exec_api.endpoints(execution['id'])
181
        if endpoints is not None and len(endpoints) > 0:
182
            print('Exposed endpoints:')
183 184
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
185 186 187 188
        else:
            print('This ZApp does not expose any endpoint')

        print()
189
        tabular_data = []
190
        for c_id in execution['services']:
191
            service = cont_api.get(c_id)
192
            service_data = [service['id'], service['name'], 'true' if service['essential'] else 'false', service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
193
            tabular_data.append(service_data)
194
        headers = ['ID', 'Name', 'Essential', 'Zoe status', 'Backend status', 'Error message']
195
        print(tabulate(tabular_data, headers))
196 197


198
def exec_kill_cmd(auth, args):
199
    """Kill an execution."""
200
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
201 202 203
    exec_api.terminate(args.id)


204
def logs_cmd(auth, args):
205
    """Retrieves and streams the logs of a service."""
206
    _log_stream_stdout(args.service_id, args.timestamps, auth)
207 208


209
def stats_cmd(auth, args_):
210
    """Prints statistics on Zoe internals."""
211
    stats_api = ZoeStatisticsAPI(auth['url'], auth['user'], auth['pass'])
212 213
    sched = stats_api.scheduler()
    print('Scheduler queue length: {}'.format(sched['queue_length']))
214
    print('Scheduler running queue length: {}'.format(sched['running_length']))
215 216
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

217
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
218 219
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
220 221 222 223 224 225 226 227 228
ZOE_PASS: the password used for authentication

or create a ~/.zoerc file (another location can be specified with --auth-file) like this:
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
229 230


Daniele Venzano's avatar
Daniele Venzano committed
231
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
232
    """Parse command line arguments."""
233 234
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
235
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
236 237 238

    subparser = parser.add_subparsers()

239
    # info
240 241 242
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
243
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
244
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
245
    argparser_exec_start.add_argument('name', help="Name of the execution")
246
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
247 248
    argparser_exec_start.set_defaults(func=exec_start_cmd)

249
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
250 251 252 253
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
254 255 256 257 258
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
259 260
    argparser_app_list.set_defaults(func=exec_list_cmd)

261 262 263 264
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

265
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
266
    argparser_app_get.add_argument('id', help='The ID of the application')
267
    argparser_app_get.set_defaults(func=app_get_cmd)
268 269 270 271 272

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

273 274 275 276 277
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

278 279 280
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

281 282 283 284
    return parser, parser.parse_args()


def zoe():
285
    """Main entrypoint."""
286 287 288 289 290 291 292 293 294 295 296
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

297 298 299
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
300 301

    try:
302 303 304 305
        ret = _check_api_version(auth)
        if not ret:
            sys.exit(0)
        args.func(auth, args)
306 307
    except ZoeAPIException as e:
        print(e.message)
308 309
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
310
    sys.exit(0)