entrypoint.py 13 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
from datetime import datetime, timezone
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_lib.info import ZoeInfoAPI
Daniele Venzano's avatar
Daniele Venzano committed
33
from zoe_lib.services import ZoeServiceAPI
34
from zoe_lib.statistics import ZoeStatisticsAPI
35
from zoe_lib.exceptions import ZoeAPIException
36
from zoe_lib.executions import ZoeExecutionsAPI
37
from zoe_lib.applications import app_validate
38
from zoe_lib.version import ZOE_VERSION
39 40


41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
def _check_api_version(auth):
    """Checks if there is a version mismatch between server and client."""
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
    try:
        info_api.info()
        return True
    except ZoeAPIException:
        print('Error: this client can talk to ZOE v. {}, but server is reporting an error'.format(ZOE_VERSION,))
        print('Error: your client is too old (or too new) to speak with the configured server')
        print('Error: check the version this server is running at the bottom of this web page: {}'.format(auth['url']))
        return False


def _log_stream_stdout(service_id, timestamps, auth):
    service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
56 57 58 59 60 61 62 63 64 65 66 67
    try:
        for line in service_api.get_logs(service_id):
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


68
def info_cmd(auth, args_):
69
    """Queries the info endpoint."""
70
    info_api = ZoeInfoAPI(auth['url'], auth['user'], auth['pass'])
71 72 73 74 75 76 77
    info = info_api.info()
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


78
def app_get_cmd(auth, args):
79
    """Extract an application description from an execution."""
80
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
81
    execution = exec_api.get(args.id)
82
    if execution is None:
83
        print("no such execution")
84
    else:
85
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
86 87


88
def exec_list_cmd(auth, args):
89
    """List executions"""
90
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
Daniele Venzano's avatar
Daniele Venzano committed
91 92 93 94 95 96 97 98 99 100 101
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
102 103 104
    filters = {
        'user_id': auth['user']
    }
Daniele Venzano's avatar
Daniele Venzano committed
105 106 107
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
Daniele Venzano's avatar
Daniele Venzano committed
108
    data = exec_api.list(**filters)
109 110
    if len(data) == 0:
        return
111 112 113
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data.values(), key=lambda x: x['id'])]
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
114 115


116
def exec_start_cmd(auth, args):
117
    """Submit an execution."""
118
    app_descr = json.load(args.jsonfile)
119
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
    exec_id = exec_api.start(args.name, app_descr)
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
            execution = exec_api.get(exec_id)
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
135
        monitor_service_id = None
136
        service_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
137 138 139 140 141 142 143
        for service_id in execution['services']:
            service = service_api.get(service_id)
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
144
        why_stop = _log_stream_stdout(monitor_service_id, False, auth)
145 146 147 148 149 150 151
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
152 153


154
def exec_get_cmd(auth, args):
155
    """Gather information about an execution."""
156 157
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
    cont_api = ZoeServiceAPI(auth['url'], auth['user'], auth['pass'])
158
    execution = exec_api.get(args.id)
159 160 161 162
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
163
        print('Application name: {}'.format(execution['description']['name']))
164
        print('Status: {}'.format(execution['status']))
165
        if execution['status'] == 'error':
166
            print('Last error: {}'.format(execution['error_message']))
167
        print()
168
        print('Time submit: {}'.format(datetime.fromtimestamp(execution['time_submit'], timezone.utc).astimezone()))
169 170 171 172

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
173
            print('Time start: {}'.format(datetime.fromtimestamp(execution['time_start'], timezone.utc).astimezone()))
174 175 176 177

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
178
            print('Time end: {}'.format(datetime.fromtimestamp(execution['time_end'], timezone.utc).astimezone()))
179
        print()
180

181
        endpoints = exec_api.endpoints(execution['id'])
182
        if endpoints is not None and len(endpoints) > 0:
183
            print('Exposed endpoints:')
184 185
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
186 187 188 189
        else:
            print('This ZApp does not expose any endpoint')

        print()
190
        tabular_data = []
191
        for c_id in execution['services']:
192
            service = cont_api.get(c_id)
193
            service_data = [service['id'], service['name'], 'true' if service['essential'] else 'false', service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
194
            tabular_data.append(service_data)
195
        headers = ['ID', 'Name', 'Essential', 'Zoe status', 'Backend status', 'Error message']
196
        print(tabulate(tabular_data, headers))
197 198


199
def exec_kill_cmd(auth, args):
200
    """Kill an execution."""
201
    exec_api = ZoeExecutionsAPI(auth['url'], auth['user'], auth['pass'])
202 203 204
    exec_api.terminate(args.id)


205
def logs_cmd(auth, args):
206
    """Retrieves and streams the logs of a service."""
207
    _log_stream_stdout(args.service_id, args.timestamps, auth)
208 209


210
def stats_cmd(auth, args_):
211
    """Prints statistics on Zoe internals."""
212
    stats_api = ZoeStatisticsAPI(auth['url'], auth['user'], auth['pass'])
213 214
    sched = stats_api.scheduler()
    print('Scheduler queue length: {}'.format(sched['queue_length']))
215
    print('Scheduler running queue length: {}'.format(sched['running_length']))
216 217
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

218
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
219 220
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
221 222 223 224 225 226 227 228 229
ZOE_PASS: the password used for authentication

or create a ~/.zoerc file (another location can be specified with --auth-file) like this:
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
230 231


Daniele Venzano's avatar
Daniele Venzano committed
232
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
233
    """Parse command line arguments."""
234 235
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
236
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
237 238 239

    subparser = parser.add_subparsers()

240
    # info
241 242 243
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
244
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
245
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
246
    argparser_exec_start.add_argument('name', help="Name of the execution")
247
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
248 249
    argparser_exec_start.set_defaults(func=exec_start_cmd)

250
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
251 252 253 254
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
255 256 257 258 259
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
260 261
    argparser_app_list.set_defaults(func=exec_list_cmd)

262 263 264 265
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

266
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
267
    argparser_app_get.add_argument('id', help='The ID of the application')
268
    argparser_app_get.set_defaults(func=app_get_cmd)
269 270 271 272 273

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

274 275 276 277 278
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

279 280 281
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

282 283 284 285
    return parser, parser.parse_args()


def zoe():
286
    """Main entrypoint."""
287 288 289 290 291 292 293 294 295 296 297
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

298 299 300
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
301 302

    try:
303 304 305 306
        ret = _check_api_version(auth)
        if not ret:
            sys.exit(0)
        args.func(auth, args)
307 308
    except ZoeAPIException as e:
        print(e.message)
309 310
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
311
    sys.exit(0)