entrypoint.py 11.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the entrypoint for the commandline Zoe client
"""

20
from datetime import datetime, timezone
21
22
23
24
import json
import logging
import os
import sys
25
import time
26
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
Daniele Venzano's avatar
Daniele Venzano committed
27
from typing import Tuple
28

29
30
from tabulate import tabulate

31
from zoe_cmd import utils
32
from zoe_cmd.api_lib import ZoeAPI
33
from zoe_lib.exceptions import ZoeAPIException
34
35


36
def _log_stream_stdout(service_id, timestamps, api: ZoeAPI):
37
    try:
38
        for line in api.services.get_logs(service_id):
39
40
41
42
43
44
45
46
47
48
            if timestamps:
                print(line[0], line[1])
            else:
                print(line[1])
    except KeyboardInterrupt:
        print('CTRL-C detected, exiting...')
        return 'interrupt'
    return 'stream_end'


49
def info_cmd(api: ZoeAPI, args_):
50
    """Queries the info endpoint."""
51
    info = api.info.info()
52
53
54
55
56
57
    print("Zoe version: ", info['version'])
    print("Zoe API version: ", info['api_version'])
    print("ZApp format version: ", info['application_format_version'])
    print("Deployment name: ", info['deployment_name'])


58
def app_get_cmd(api: ZoeAPI, args):
59
    """Extract an application description from an execution."""
60
    execution = api.executions.get(args.id)
61
    if execution is None:
62
        print("no such execution")
63
    else:
64
        json.dump(execution['description'], sys.stdout, sort_keys=True, indent=4)
65
66


67
def exec_list_cmd(api: ZoeAPI, args):
68
    """List executions"""
69
    print(api.auth_user)
Daniele Venzano's avatar
Daniele Venzano committed
70
71
72
73
74
75
76
77
78
79
80
    filter_names = [
        'status',
        'name',
        'limit',
        'earlier_than_submit',
        'earlier_than_start',
        'earlier_than_end',
        'later_than_submit',
        'later_than_start',
        'later_than_end'
    ]
81
    filters = {
82
        'user_id': api.auth_user['id']
83
    }
Daniele Venzano's avatar
Daniele Venzano committed
84
85
86
    for key, value in vars(args).items():
        if key in filter_names:
            filters[key] = value
87
    data = api.executions.list(**filters)
88
89
    if len(data) == 0:
        return
90
    tabular_data = [[e['id'], e['name'], e['user_id'], e['status']] for e in sorted(data, key=lambda x: x['id'])]
91
92
    headers = ['ID', 'Name', 'User ID', 'Status']
    print(tabulate(tabular_data, headers))
93
94


95
def exec_start_cmd(api: ZoeAPI, args):
96
    """Submit an execution."""
97
    app_descr = json.load(args.jsonfile)
98
    exec_id = api.executions.start(args.name, app_descr)
99
100
101
102
103
104
    if not args.synchronous:
        print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(exec_id))
    else:
        print("Application scheduled successfully with ID {}, waiting for status change".format(exec_id))
        old_status = 'submitted'
        while True:
105
            execution = api.executions.get(exec_id)
106
107
108
109
110
111
112
            current_status = execution['status']
            if old_status != current_status:
                print('Execution is now {}'.format(current_status))
                old_status = current_status
            if current_status == 'running':
                break
            time.sleep(1)
113
114
        monitor_service_id = None
        for service_id in execution['services']:
115
            service = api.services.get(service_id)
116
117
118
119
120
            if service['description']['monitor']:
                monitor_service_id = service['id']
                break

        print('\n>------ start of log streaming -------<\n')
121
        why_stop = _log_stream_stdout(monitor_service_id, False, api)
122
123
124
125
126
127
128
        print('\n>------ end of log streaming -------<\n')
        if why_stop == 'stream_end':
            print('Execution finished')
            exit(0)
        elif why_stop == 'interrupt':
            print('Do not worry, your execution ({}) is still running.'.format(exec_id))
            exit(1)
129
130


131
def exec_get_cmd(api: ZoeAPI, args):
132
    """Gather information about an execution."""
133
    execution = api.executions.get(args.id)
134
135
136
137
    if execution is None:
        print('Execution not found')
    else:
        print('Execution {} (ID: {})'.format(execution['name'], execution['id']))
138
        print('Application name: {}'.format(execution['description']['name']))
139
        print('Status: {}'.format(execution['status']))
140
        if execution['status'] == 'error':
141
            print('Last error: {}'.format(execution['error_message']))
142
        print()
143
        print('Time submit: {}'.format(datetime.fromtimestamp(execution['time_submit'], timezone.utc).astimezone()))
144
145
146
147

        if execution['time_start'] is None:
            print('Time start: {}'.format('not yet'))
        else:
148
            print('Time start: {}'.format(datetime.fromtimestamp(execution['time_start'], timezone.utc).astimezone()))
149
150
151
152

        if execution['time_end'] is None:
            print('Time end: {}'.format('not yet'))
        else:
153
            print('Time end: {}'.format(datetime.fromtimestamp(execution['time_end'], timezone.utc).astimezone()))
154
        print()
155

156
        endpoints = api.executions.endpoints(execution['id'])
157
        if endpoints is not None and len(endpoints) > 0:
158
            print('Exposed endpoints:')
159
160
            for endpoint in endpoints:
                print(' - {}: {}'.format(endpoint[0], endpoint[1]))
161
162
163
164
        else:
            print('This ZApp does not expose any endpoint')

        print()
165
        tabular_data = []
166
        for c_id in execution['services']:
167
            service = api.services.get(c_id)
168
            service_data = [service['id'], service['name'], 'true' if service['essential'] else 'false', service['status'], service['backend_status'], service['error_message'] if service['error_message'] is not None else '']
169
            tabular_data.append(service_data)
170
        headers = ['ID', 'Name', 'Essential', 'Zoe status', 'Backend status', 'Error message']
171
        print(tabulate(tabular_data, headers))
172
173


174
def exec_kill_cmd(api: ZoeAPI, args):
175
    """Kill an execution."""
176
    api.executions.terminate(args.id)
177
178


179
def logs_cmd(api: ZoeAPI, args):
180
    """Retrieves and streams the logs of a service."""
181
    _log_stream_stdout(args.service_id, args.timestamps, api)
182
183


184
def stats_cmd(api: ZoeAPI, args_):
185
    """Prints statistics on Zoe internals."""
186
    sched = api.statistics.scheduler()
187
    print('Scheduler queue length: {}'.format(sched['queue_length']))
188
    print('Scheduler running queue length: {}'.format(sched['running_length']))
189
190
    print('Termination threads count: {}'.format(sched['termination_threads_count']))

191

192
ENV_HELP_TEXT = '''To authenticate with Zoe you need to define three environment variables:
193
194
ZOE_URL: point to the URL of the Zoe Scheduler (ex.: http://localhost:5000/
ZOE_USER: the username used for authentication
195
196
ZOE_PASS: the password used for authentication

197
or create a ~/.zoerc file (another location can be specified with --api-file) like this:
198
199
200
201
202
203
url = xxx
user = yyy
pass = zzz

Environment variable will override the values specified in the configuration file.
'''
204
205


Daniele Venzano's avatar
Daniele Venzano committed
206
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
207
    """Parse command line arguments."""
208
209
    parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
    parser.add_argument('--debug', action='store_true', help='Enable debug output')
210
    parser.add_argument('--auth-file', type=str, help='Enable debug output', default=os.path.join(os.getenv('HOME', ''), '.zoerc'))
211
212
213

    subparser = parser.add_subparsers()

214
    # info
215
216
217
    argparser_info = subparser.add_parser('info', help="Queries the API for supported versions")
    argparser_info.set_defaults(func=info_cmd)

Francesco Pace's avatar
Francesco Pace committed
218
    argparser_exec_start = subparser.add_parser('start', help="Start an application")
219
    argparser_exec_start.add_argument('-s', '--synchronous', action='store_true', help="Do not detach, wait for execution to finish, print main service log")
220
    argparser_exec_start.add_argument('name', help="Name of the execution")
221
    argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
222
223
    argparser_exec_start.set_defaults(func=exec_start_cmd)

224
    argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
Daniele Venzano's avatar
Daniele Venzano committed
225
226
227
228
    argparser_app_list.add_argument('--limit', type=int, help='Limit the number of executions')
    argparser_app_list.add_argument('--name', help='Show only executions with this name')
    argparser_app_list.add_argument('--status', choices=["submitted", "scheduled", "starting", "error", "running", "cleaning up", "terminated"], help='Show only executions with this status')
    argparser_app_list.add_argument('--earlier-than-submit', help='Show only executions submitted earlier than this timestamp (seconds since UTC epoch)')
229
230
231
232
233
    argparser_app_list.add_argument('--earlier-than-start', help='Show only executions started earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--earlier-than-end', help='Show only executions ended earlier than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-submit', help='Show only executions submitted later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-start', help='Show only executions started later than this timestamp (seconds since UTC epoch)')
    argparser_app_list.add_argument('--later-than-end', help='Show only executions ended later than this timestamp (seconds since UTC epoch)')
234
235
    argparser_app_list.set_defaults(func=exec_list_cmd)

236
237
238
239
    argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
    argparser_execution_get.add_argument('id', type=int, help="Execution id")
    argparser_execution_get.set_defaults(func=exec_get_cmd)

240
    argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
241
    argparser_app_get.add_argument('id', help='The ID of the application')
242
    argparser_app_get.set_defaults(func=app_get_cmd)
243
244
245
246
247

    argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)

248
249
250
251
252
    argparser_logs = subparser.add_parser('logs', help="Streams the service logs")
    argparser_logs.add_argument('service_id', type=int, help="Service id")
    argparser_logs.add_argument('-t', '--timestamps', action='store_true', help="Prefix timestamps for each line")
    argparser_logs.set_defaults(func=logs_cmd)

253
254
255
    argparser_stats = subparser.add_parser('stats', help="Prints all available statistics")
    argparser_stats.set_defaults(func=stats_cmd)

256
257
258
259
    return parser, parser.parse_args()


def zoe():
260
    """Main entrypoint."""
261
262
263
264
265
266
267
268
269
270
271
    parser, args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("requests").setLevel(logging.WARNING)

    if not hasattr(args, "func"):
        parser.print_help()
        return

272
273
274
    auth = utils.read_auth(args)
    if auth is None:
        sys.exit(1)
275
276

    try:
277
278
        api = ZoeAPI(auth['url'], auth['user'], auth['pass'])
        args.func(api, args)
279
280
    except ZoeAPIException as e:
        print(e.message)
281
282
    except KeyboardInterrupt:
        print('CTRL-C pressed, exiting...')
283
    sys.exit(0)