entrypoint.py 11.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
from datetime import datetime, timezone
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_cmd.api_lib import ZoeAPI
33
from zoe_lib.exceptions import ZoeAPIException
34 35


36
def _log_stream_stdout(service_id, timestamps, api: ZoeAPI):
37
    try:
38
        for line in api.services.get_logs(service_id):
39 40 41 42 43 44 45 46 47 48
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


49
def info_cmd(api: ZoeAPI, args_):
50
    """Queries the info endpoint."""
51
    info = api.info.info()
52 53 54 55 56 57
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


58
def app_get_cmd(api: ZoeAPI, args):
59
    """Extract an application description from an execution."""
60
    execution = api.executions.get(args.id)
61
    if execution is None:
62
        print("no such execution")
63
    else:
64
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
65 66


67
def exec_list_cmd(api: ZoeAPI, args):
68
    """List executions"""
69
    print(api.auth_user)
70 71 72 73 74 75 76 77 78 79 80
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
81
    filters = {
82
        'user_id': api.auth_user['id']
83
    }
Daniele Venzano's avatar
Daniele Venzano committed
84 85 86
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
87
    data = api.executions.list(**filters)
88 89
    if len(data) == 0:
        return
90 91 92
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data.values(), key=lambda x: x['id'])]
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
93 94


95
def exec_start_cmd(api: ZoeAPI, args):
96
    """Submit an execution."""
97
    app_descr = json.load(args.jsonfile)
98
    exec_id = api.executions.start(args.name, app_descr)
99 100 101 102 103 104
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
105
            execution = api.executions.get(exec_id)
106 107 108 109 110 111 112
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
113 114
        monitor_service_id = None
        for service_id in execution['services']:
115
            service = api.services.get(service_id)
116 117 118 119 120
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
121
        why_stop = _log_stream_stdout(monitor_service_id, False, api)
122 123 124 125 126 127 128
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
129 130


131
def exec_get_cmd(api: ZoeAPI, args):
132
    """Gather information about an execution."""
133
    execution = api.executions.get(args.id)
134 135 136 137
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
138
        print('Application name: {}'.format(execution['description']['name']))
139
        print('Status: {}'.format(execution['status']))
140
        if execution['status'] == 'error':
141
            print('Last error: {}'.format(execution['error_message']))
142
        print()
143
        print('Time submit: {}'.format(datetime.fromtimestamp(execution['time_submit'], timezone.utc).astimezone()))
144 145 146 147

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
148
            print('Time start: {}'.format(datetime.fromtimestamp(execution['time_start'], timezone.utc).astimezone()))
149 150 151 152

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
153
            print('Time end: {}'.format(datetime.fromtimestamp(execution['time_end'], timezone.utc).astimezone()))
154
        print()
155

156
        endpoints = api.executions.endpoints(execution['id'])
157
        if endpoints is not None and len(endpoints) > 0:
158
            print('Exposed endpoints:')
159 160
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
161 162 163 164
        else:
            print('This ZApp does not expose any endpoint')

        print()
165
        tabular_data = []
166
        for c_id in execution['services']:
167
            service = api.services.get(c_id)
168
            service_data = [service['id'], service['name'], 'true' if service['essential'] else 'false', service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
169
            tabular_data.append(service_data)
170
        headers = ['ID', 'Name', 'Essential', 'Zoe status', 'Backend status', 'Error message']
171
        print(tabulate(tabular_data, headers))
172 173


174
def exec_kill_cmd(api: ZoeAPI, args):
175
    """Kill an execution."""
176
    api.executions.terminate(args.id)
177 178


179
def logs_cmd(api: ZoeAPI, args):
180
    """Retrieves and streams the logs of a service."""
181
    _log_stream_stdout(args.service_id, args.timestamps, api)
182 183


184
def stats_cmd(api: ZoeAPI, args_):
185
    """Prints statistics on Zoe internals."""
186
    sched = api.statistics.scheduler()
187
    print('Scheduler queue length: {}'.format(sched['queue_length']))
188
    print('Scheduler running queue length: {}'.format(sched['running_length']))
189 190
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

191

192
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
193 194
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
195 196
ZOE_PASS: the password used for authentication

197
or create a ~/.zoerc file (another location can be specified with --api-file) like this:
198 199 200 201 202 203
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
204 205


Daniele Venzano's avatar
Daniele Venzano committed
206
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
207
    """Parse command line arguments."""
208 209
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
210
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
211 212 213

    subparser = parser.add_subparsers()

214
    # info
215 216 217
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
218
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
219
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
220
    argparser_exec_start.add_argument('name', help="Name of the execution")
221
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
222 223
    argparser_exec_start.set_defaults(func=exec_start_cmd)

224
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
225 226 227 228
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
229 230 231 232 233
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
234 235
    argparser_app_list.set_defaults(func=exec_list_cmd)

236 237 238 239
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

240
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
241
    argparser_app_get.add_argument('id', help='The ID of the application')
242
    argparser_app_get.set_defaults(func=app_get_cmd)
243 244 245 246 247

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

248 249 250 251 252
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

253 254 255
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

256 257 258 259
    return parser, parser.parse_args()


def zoe():
260
    """Main entrypoint."""
261 262 263 264 265 266 267 268 269 270 271
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

272 273 274
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
275 276

    try:
277 278
        api = ZoeAPI(auth['url'], auth['user'], auth['pass'])
        args.func(api, args)
279 280
    except ZoeAPIException as e:
        print(e.message)
281 282
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
283
    sys.exit(0)