entrypoint.py 11.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
from datetime import datetime, timezone
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_cmd.api_lib import ZoeAPI
33
from zoe_lib.exceptions import ZoeAPIException
34 35


36
def _log_stream_stdout(service_id, timestamps, api: ZoeAPI):
37
    try:
38
        for line in api.services.get_logs(service_id):
39 40 41 42 43 44 45 46 47 48
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


49
def info_cmd(api: ZoeAPI, args_):
50
    """Queries the info endpoint."""
51
    info = api.info.info()
52 53 54 55 56 57
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


58
def app_get_cmd(api: ZoeAPI, args):
59
    """Extract an application description from an execution."""
60
    execution = api.executions.get(args.id)
61
    if execution is None:
62
        print("no such execution")
63
    else:
64
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
65 66


67
def exec_list_cmd(api: ZoeAPI, args):
68
    """List executions"""
69
    print(api.auth_user)
Daniele Venzano's avatar
Daniele Venzano committed
70 71 72 73 74 75 76 77 78 79 80
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
81
    filters = {
82
        'user_id': api.auth_user['id']
83
    }
Daniele Venzano's avatar
Daniele Venzano committed
84 85 86
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
87
    data = api.executions.list(**filters)
88 89
    if len(data) == 0:
        return
90
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data, key=lambda x: x['id'])]
91 92
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
93 94


95
def exec_start_cmd(api: ZoeAPI, args):
96
    """Submit an execution."""
97
    app_descr = json.load(args.jsonfile)
98
    exec_id = api.executions.start(args.name, app_descr)
99
    if not args.synchronous:
100
        print("Execution created successfully with ID {}, use the exec-get command to check its status".format(exec_id))
101
    else:
102
        print("Execution created successfully with ID {}, waiting for status change".format(exec_id))
103 104
        old_status = 'submitted'
        while True:
105
            execution = api.executions.get(exec_id)
106 107 108 109 110 111
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
112 113 114 115
            time.sleep(0.2)
        if args.running:
            print('Execution running')
            exit(0)
116 117
        monitor_service_id = None
        for service_id in execution['services']:
118
            service = api.services.get(service_id)
119 120 121 122 123
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
124
        why_stop = _log_stream_stdout(monitor_service_id, False, api)
125 126 127 128 129 130 131
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
132 133


134
def exec_get_cmd(api: ZoeAPI, args):
135
    """Gather information about an execution."""
136
    execution = api.executions.get(args.id)
137 138 139 140
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
141
        print('Application name: {}'.format(execution['description']['name']))
142
        print('Status: {}'.format(execution['status']))
143
        if execution['status'] == 'error':
144
            print('Last error: {}'.format(execution['error_message']))
145
        print()
146
        print('Time submit: {}'.format(datetime.fromtimestamp(execution['time_submit'], timezone.utc).astimezone()))
147 148 149 150

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
151
            print('Time start: {}'.format(datetime.fromtimestamp(execution['time_start'], timezone.utc).astimezone()))
152 153 154 155

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
156
            print('Time end: {}'.format(datetime.fromtimestamp(execution['time_end'], timezone.utc).astimezone()))
157
        print()
158

159
        endpoints = api.executions.endpoints(execution['id'])
160
        if endpoints is not None and len(endpoints) > 0:
161
            print('Exposed endpoints:')
162 163
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
164 165 166 167
        else:
            print('This ZApp does not expose any endpoint')

        print()
168
        tabular_data = []
169
        for c_id in execution['services']:
170
            service = api.services.get(c_id)
171
            service_data = [service['id'], service['name'], 'true' if service['essential'] else 'false', service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
172
            tabular_data.append(service_data)
173
        headers = ['ID', 'Name', 'Essential', 'Zoe status', 'Backend status', 'Error message']
174
        print(tabulate(tabular_data, headers))
175 176


177
def exec_kill_cmd(api: ZoeAPI, args):
178
    """Kill an execution."""
179
    api.executions.terminate(args.id)
180 181


182
def logs_cmd(api: ZoeAPI, args):
183
    """Retrieves and streams the logs of a service."""
184
    _log_stream_stdout(args.service_id, args.timestamps, api)
185 186


187
def stats_cmd(api: ZoeAPI, args_):
188
    """Prints statistics on Zoe internals."""
189
    sched = api.statistics.scheduler()
190
    print('Scheduler queue length: {}'.format(sched['queue_length']))
191
    print('Scheduler running queue length: {}'.format(sched['running_length']))
192 193
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

194

195
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
196 197
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
198 199
ZOE_PASS: the password used for authentication

200
or create a ~/.zoerc file (another location can be specified with --api-file) like this:
201 202 203 204 205 206
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
207 208


Daniele Venzano's avatar
Daniele Venzano committed
209
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
210
    """Parse command line arguments."""
211 212
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
213
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
214 215 216

    subparser = parser.add_subparsers()

217
    # info
218 219 220
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
221
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
222
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
223
    argparser_exec_start.add_argument('-r', '--running', action='store_true', help="Do not detach, wait for execution to start (state: running)")
224
    argparser_exec_start.add_argument('name', help="Name of the execution")
225
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
226 227
    argparser_exec_start.set_defaults(func=exec_start_cmd)

228
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
229 230
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
231
    argparser_app_list.add_argument('--status', choices=["submitted", "queued", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
Daniele Venzano's avatar
Daniele Venzano committed
232
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
233 234 235 236 237
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
238 239
    argparser_app_list.set_defaults(func=exec_list_cmd)

240 241 242 243
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

244
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
245
    argparser_app_get.add_argument('id', help='The ID of the application')
246
    argparser_app_get.set_defaults(func=app_get_cmd)
247 248 249 250 251

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

252 253 254 255 256
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

257 258 259
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

260 261 262 263
    return parser, parser.parse_args()


def zoe():
264
    """Main entrypoint."""
265 266 267 268 269 270 271 272 273 274 275
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

276 277 278
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
279 280

    try:
281 282
        api = ZoeAPI(auth['url'], auth['user'], auth['pass'])
        args.func(api, args)
283 284
    except ZoeAPIException as e:
        print(e.message)
285 286
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
287
    sys.exit(0)