entrypoint.py 12.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
import datetime
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_lib.info import ZoeInfoAPI
Daniele Venzano's avatar
Daniele Venzano committed
33
from zoe_lib.services import ZoeServiceAPI
34
from zoe_lib.statistics import ZoeStatisticsAPI
35
from zoe_lib.exceptions import ZoeAPIException
36
from zoe_lib.executions import ZoeExecutionsAPI
37
from zoe_lib.applications import app_validate
38
from zoe_lib.version import ZOE_VERSION
39 40


41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
def _check_api_version(auth):
    """Checks if there is a version mismatch between server and client."""
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
    try:
        info_api.info()
        return True
    except ZoeAPIException:
        print('Error: this client can talk to ZOE v. {}, but server is reporting an error'.format(ZOE_VERSION,))
        print('Error: your client is too old (or too new) to speak with the configured server')
        print('Error: check the version this server is running at the bottom of this web page: {}'.format(auth['url']))
        return False


def _log_stream_stdout(service_id, timestamps, auth):
    service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
56 57 58 59 60 61 62 63 64 65 66 67
    try:
        for line in service_api.get_logs(service_id):
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


68
def info_cmd(auth, args_):
69
    """Queries the info endpoint."""
70
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
71 72 73 74 75 76 77
    info = info_api.info()
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


78
def app_get_cmd(auth, args):
79
    """Extract an application description from an execution."""
80
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
81
    execution = exec_api.get(args.id)
82
    if execution is None:
83
        print("no such execution")
84
    else:
85
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
86 87


88
def exec_list_cmd(auth, args):
89
    """List executions"""
90
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
Daniele Venzano's avatar
Daniele Venzano committed
91 92 93 94 95 96 97 98 99 100 101
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
102 103 104
    filters = {
        'user_id': auth['user']
    }
Daniele Venzano's avatar
Daniele Venzano committed
105 106 107
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
Daniele Venzano's avatar
Daniele Venzano committed
108
    data = exec_api.list(**filters)
109 110 111
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data.values(), key=lambda x: x['id'])]
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
112 113


114
def exec_start_cmd(auth, args):
115
    """Submit an execution."""
116
    app_descr = json.load(args.jsonfile)
117
    app_validate(app_descr)
118
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    exec_id = exec_api.start(args.name, app_descr)
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
            execution = exec_api.get(exec_id)
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
134
        monitor_service_id = None
135
        service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
136 137 138 139 140 141 142
        for service_id in execution['services']:
            service = service_api.get(service_id)
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
143
        why_stop = _log_stream_stdout(monitor_service_id, False, auth)
144 145 146 147 148 149 150
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
151 152


153
def exec_get_cmd(auth, args):
154
    """Gather information about an execution."""
155 156
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
    cont_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
157
    execution = exec_api.get(args.id)
158 159 160 161
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
162
        print('Application name: {}'.format(execution['description']['name']))
163
        print('Status: {}'.format(execution['status']))
164
        if execution['status'] == 'error':
165
            print('Last error: {}'.format(execution['error_message']))
166
        print()
167 168 169 170 171 172 173 174 175 176 177
        print('Time submit: {}'.format(datetime.datetime.fromtimestamp(execution['time_submit'])))

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
            print('Time start: {}'.format(datetime.datetime.fromtimestamp(execution['time_start'])))

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
            print('Time end: {}'.format(datetime.datetime.fromtimestamp(execution['time_end'])))
178
        print()
179

180
        endpoints = exec_api.endpoints(execution['id'])
181
        if endpoints is not None and len(endpoints) > 0:
182
            print('Exposed endpoints:')
183 184
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
185 186 187 188
        else:
            print('This ZApp does not expose any endpoint')

        print()
189
        tabular_data = []
190
        for c_id in execution['services']:
191
            service = cont_api.get(c_id)
192 193 194 195
            service_data = [service['id'], service['name'], service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
            tabular_data.append(service_data)
        headers = ['ID', 'Name', 'Zoe status', 'Backend status', 'Error message']
        print(tabulate(tabular_data, headers))
196 197


198
def exec_kill_cmd(auth, args):
199
    """Kill an execution."""
200
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
201 202 203
    exec_api.terminate(args.id)


204
def logs_cmd(auth, args):
205
    """Retrieves and streams the logs of a service."""
206
    _log_stream_stdout(args.service_id, args.timestamps, auth)
207 208


209
def stats_cmd(auth, args_):
210
    """Prints statistics on Zoe internals."""
211
    stats_api = ZoeStatisticsAPI(auth['url'], auth['user'], auth['pass'])
212 213 214 215
    sched = stats_api.scheduler()
    print('Scheduler queue length: {}'.format(sched['queue_length']))
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

216
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
217 218
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
219 220 221 222 223 224 225 226 227
ZOE_PASS: the password used for authentication

or create a ~/.zoerc file (another location can be specified with --auth-file) like this:
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
228 229


Daniele Venzano's avatar
Daniele Venzano committed
230
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
231
    """Parse command line arguments."""
232 233
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
234
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
235 236 237

    subparser = parser.add_subparsers()

238
    # info
239 240 241
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
242
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
243
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
244
    argparser_exec_start.add_argument('name', help="Name of the execution")
245
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
246 247
    argparser_exec_start.set_defaults(func=exec_start_cmd)

248
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
249 250 251 252
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
253 254 255 256 257
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
258 259
    argparser_app_list.set_defaults(func=exec_list_cmd)

260 261 262 263
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

264
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
265
    argparser_app_get.add_argument('id', help='The ID of the application')
266
    argparser_app_get.set_defaults(func=app_get_cmd)
267 268 269 270 271

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

272 273 274 275 276
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

277 278 279
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

280 281 282 283
    return parser, parser.parse_args()


def zoe():
284
    """Main entrypoint."""
285 286 287 288 289 290 291 292 293 294 295
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

296 297 298
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
299 300

    try:
301 302 303 304
        ret = _check_api_version(auth)
        if not ret:
            sys.exit(0)
        args.func(auth, args)
305 306
    except ZoeAPIException as e:
        print(e.message)
307 308
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
309
    sys.exit(0)