entrypoint.py 12.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
import datetime
21 22 23 24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29 30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_lib.info import ZoeInfoAPI
Daniele Venzano's avatar
Daniele Venzano committed
33
from zoe_lib.services import ZoeServiceAPI
34
from zoe_lib.statistics import ZoeStatisticsAPI
35
from zoe_lib.exceptions import ZoeAPIException, InvalidApplicationDescription
36
from zoe_lib.executions import ZoeExecutionsAPI
37
from zoe_lib.applications import app_validate
38 39


40 41 42 43 44 45 46 47 48 49 50 51 52 53
def _log_stream_stdout(service_id, timestamps):
    service_api = ZoeServiceAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
    try:
        for line in service_api.get_logs(service_id):
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


54 55 56 57 58 59 60 61 62 63
def info_cmd(args_):
    """Queries the info endpoint."""
    info_api = ZoeInfoAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
    info = info_api.info()
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


64
def app_validate_cmd(args):
65
    """Validate an application description."""
66
    app_descr = json.load(args.jsonfile)
67
    try:
68 69 70
        app_validate(app_descr)
    except InvalidApplicationDescription as e:
        print(e)
71
    else:
72
        print("Static validation OK")
73 74 75


def app_get_cmd(args):
76
    """Extract an application description from an execution."""
77
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
78
    execution = exec_api.get(args.id)
79
    if execution is None:
80
        print("no such execution")
81
    else:
82
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
83 84


Daniele Venzano's avatar
Daniele Venzano committed
85
def exec_list_cmd(args):
86
    """List executions"""
87
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
Daniele Venzano's avatar
Daniele Venzano committed
88 89 90 91 92 93 94 95 96 97 98 99 100
    filter_names = [
        'status',
        'name',
        'user_id',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
    filters = {}
Daniele Venzano's avatar
Daniele Venzano committed
101 102 103
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
Daniele Venzano's avatar
Daniele Venzano committed
104
    data = exec_api.list(**filters)
105 106 107
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data.values(), key=lambda x: x['id'])]
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
108 109 110


def exec_start_cmd(args):
111
    """Submit an execution."""
112
    app_descr = json.load(args.jsonfile)
113
    app_validate(app_descr)
114
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    exec_id = exec_api.start(args.name, app_descr)
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
            execution = exec_api.get(exec_id)
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
        monitor_service_id = None
        service_api = ZoeServiceAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
        for service_id in execution['services']:
            service = service_api.get(service_id)
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
        why_stop = _log_stream_stdout(monitor_service_id, False)
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
147 148 149


def exec_get_cmd(args):
150
    """Gather information about an execution."""
151
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
Daniele Venzano's avatar
Daniele Venzano committed
152
    cont_api = ZoeServiceAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
153
    execution = exec_api.get(args.id)
154 155 156 157
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
158
        print('Application name: {}'.format(execution['description']['name']))
159
        print('Status: {}'.format(execution['status']))
160
        if execution['status'] == 'error':
161
            print('Last error: {}'.format(execution['error_message']))
162
        print()
163 164 165 166 167 168 169 170 171 172 173
        print('Time submit: {}'.format(datetime.datetime.fromtimestamp(execution['time_submit'])))

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
            print('Time start: {}'.format(datetime.datetime.fromtimestamp(execution['time_start'])))

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
            print('Time end: {}'.format(datetime.datetime.fromtimestamp(execution['time_end'])))
174
        print()
175

176 177 178
        endpoints = exec_api.endpoints(execution['id'])
        if len(endpoints) > 0:
            print('Exposed endpoints:')
179 180
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
181 182 183 184
        else:
            print('This ZApp does not expose any endpoint')

        print()
185
        tabular_data = []
186
        for c_id in execution['services']:
187
            service = cont_api.get(c_id)
188 189 190 191
            service_data = [service['id'], service['name'], service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
            tabular_data.append(service_data)
        headers = ['ID', 'Name', 'Zoe status', 'Backend status', 'Error message']
        print(tabulate(tabular_data, headers))
192 193 194


def exec_kill_cmd(args):
195
    """Kill an execution."""
196 197 198 199
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
    exec_api.terminate(args.id)


200
def exec_rm_cmd(args):
201
    """Delete an execution and kill it if necessary."""
202 203 204 205
    exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
    exec_api.delete(args.id)


206 207 208 209 210
def logs_cmd(args):
    """Retrieves and streams the logs of a service."""
    _log_stream_stdout(args.service_id, args.timestamps)


211 212 213 214 215 216 217
def stats_cmd(args_):
    """Prints statistics on Zoe internals."""
    stats_api = ZoeStatisticsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
    sched = stats_api.scheduler()
    print('Scheduler queue length: {}'.format(sched['queue_length']))
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

218 219 220 221 222 223
ENV_HELP_TEXT = '''To use this tool you need also to define three environment variables:
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
ZOE_PASS: the password used for authentication'''


Daniele Venzano's avatar
Daniele Venzano committed
224
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
225
    """Parse command line arguments."""
226 227 228 229 230
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')

    subparser = parser.add_subparsers()

231 232 233
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

234 235 236
    argparser_app_validate = subparser.add_parser('app-validate', help='Validate an application description')
    argparser_app_validate.add_argument('jsonfile', type=FileType("r"), help='Application description')
    argparser_app_validate.set_defaults(func=app_validate_cmd)
237

Francesco Pace's avatar
Francesco Pace committed
238
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
239
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
240
    argparser_exec_start.add_argument('name', help="Name of the execution")
241
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
242 243
    argparser_exec_start.set_defaults(func=exec_start_cmd)

244
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
245 246
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
247
    argparser_app_list.add_argument('--user_id', help='Show only executions belonging to this user')
Daniele Venzano's avatar
Daniele Venzano committed
248 249
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
250 251 252 253 254
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
255 256
    argparser_app_list.set_defaults(func=exec_list_cmd)

257 258 259 260
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

261
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
262
    argparser_app_get.add_argument('id', help='The ID of the application')
263
    argparser_app_get.set_defaults(func=app_get_cmd)
264 265 266 267 268

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

Daniele Venzano's avatar
Daniele Venzano committed
269 270 271
    argparser_execution_rm = subparser.add_parser('exec-rm', help="Deletes an execution")
    argparser_execution_rm.add_argument('id', type=int, help="Execution id")
    argparser_execution_rm.set_defaults(func=exec_rm_cmd)
272

273 274 275 276 277
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

278 279 280
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

281 282 283 284
    return parser, parser.parse_args()


def zoe():
285
    """Main entrypoint."""
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

    if 'ZOE_URL' not in os.environ or 'ZOE_USER' not in os.environ or 'ZOE_PASS' not in os.environ:
        parser.print_help()
        return

    try:
        args.func(args)
    except ZoeAPIException as e:
        print(e.message)
    sys.exit(0)