zoectl.py 9.21 KB
Newer Older
1 2
from argparse import ArgumentParser, Namespace
import logging
3
from zipfile import is_zipfile
4

5
from zoe_client import ZoeClient
6 7 8 9
from common.state import create_tables


def status_cmd(_):
10
    client = ZoeClient()
11 12 13 14 15 16 17 18 19
    status_report = client.platform_status()
    print(status_report)


def setup_db_cmd(_):
    create_tables()


def user_new_cmd(args):
20
    client = ZoeClient()
21 22
    user = client.user_new(args.email)
    print("New user ID: {}".format(user.id))
23 24 25


def user_get_cmd(args):
26
    client = ZoeClient()
27 28
    user = client.user_get(args.email)
    print("User ID: {}".format(user.email))
29 30


31 32
def spark_cluster_new_cmd(args):
    client = ZoeClient()
33 34
    application_id = client.spark_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
    print("Spark application added with ID: {}".format(application_id))
35 36


37 38
def spark_notebook_new_cmd(args):
    client = ZoeClient()
39 40
    application_id = client.spark_notebook_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name)
    print("Spark application added with ID: {}".format(application_id))
41 42 43 44 45


def spark_submit_new_cmd(args):
    if not is_zipfile(args.file):
        print("Error: the file specified is not a zip archive")
46
        return
47
    client = ZoeClient()
48 49
    application_id = client.spark_submit_application_new(args.user_id, args.worker_count, args.executor_memory, args.executor_cores, args.name, args.file)
    print("Spark application added with ID: {}".format(application_id))
50 51


52 53
def run_spark_cmd(args):
    client = ZoeClient()
54
    application = client.application_get(args.id)
55 56 57
    if application is None:
        print("Error: application {} does not exist".format(args.id))
        return
58
    ret = client.execution_spark_new(application.id, args.name, args.cmd, args.spark_opts)
59

60 61 62 63 64 65 66
    if ret:
        print("Application scheduled successfully, use the app-inspect command to check its status")
    else:
        print("Admission control refused to run the application specified")


def app_rm_cmd(args):
67
    client = ZoeClient()
68
    application = client.application_get(args.id)
69 70 71
    if application is None:
        print("Error: application {} does not exist".format(args.id))
        return
72
    if args.force:
73 74
        a = client.application_get(application.id)
        for eid in a.executions:
75 76 77
            e = client.execution_get(eid)
            if e.status == "running":
                print("Terminating execution {}".format(e.name))
78
                client.execution_terminate(e.id)
79

80
    client.application_remove(application.id)
81 82 83


def app_inspect_cmd(args):
84
    client = ZoeClient()
85
    application = client.application_get(args.id)
86 87 88
    if application is None:
        print("Error: application {} does not exist".format(args.id))
        return
89
    app_report = client.application_status(application.id)
90 91 92
    print(app_report)


93 94
def app_list_cmd(args):
    client = ZoeClient()
95
    applications = client.application_list(args.id)
96 97 98 99 100 101
    if len(applications) > 0:
        print("{:4} {:20} {:25}".format("ID", "Name", "Type"))
    for app in applications:
        print("{:4} {:20} {:25}".format(app.id, app.name, app.type))


102
def exec_kill_cmd(args):
103
    client = ZoeClient()
104 105 106 107
    execution = client.execution_get(args.id)
    if execution is None:
        print("Error: execution {} does not exist".format(args.id))
        return
108
    client.execution_terminate(execution.id)
109 110


111 112 113 114 115 116 117 118
def log_get_cmd(args):
    client = ZoeClient()
    log = client.log_get(args.id)
    if log is None:
        print("Error: No log found for container ID {}".format(args.id))
    print(log)


119
def process_arguments() -> Namespace:
120
    argparser = ArgumentParser(description="Zoe - Container Analytics as a Service command-line client")
121 122 123 124 125 126 127
    argparser.add_argument('-d', '--debug', action='store_true', default=False, help='Enable debug output')
    subparser = argparser.add_subparsers(title='subcommands', description='valid subcommands')

    argparser_status = subparser.add_parser('status', help="Show the platform status")
    argparser_status.set_defaults(func=status_cmd)

    argparser_user_new = subparser.add_parser('user-new', help="Create a new user")
128
    argparser_user_new.add_argument('email', help="User email address")
129 130 131
    argparser_user_new.set_defaults(func=user_new_cmd)

    argparser_user_get = subparser.add_parser('user-get', help="Get the user id for an existing user")
132
    argparser_user_get.add_argument('email', help="User email address")
133 134 135 136 137
    argparser_user_get.set_defaults(func=user_get_cmd)

    argparser_setup_db = subparser.add_parser('setup-db', help="Create the tables in the database")
    argparser_setup_db.set_defaults(func=setup_db_cmd)

138
    argparser_spark_cluster_create = subparser.add_parser('app-spark-cluster-new', help="Setup a new empty Spark cluster")
139
    argparser_spark_cluster_create.add_argument('--user-id', type=int, required=True, help='Application owner')
140
    argparser_spark_cluster_create.add_argument('--name', required=True, help='Application name')
141 142
    argparser_spark_cluster_create.add_argument('--worker-count', type=int, default=2, help='Number of workers')
    argparser_spark_cluster_create.add_argument('--executor-memory', default='2g', help='Maximum memory available per-worker, the system assumes only one executor per worker')
143
    argparser_spark_cluster_create.add_argument('--executor-cores', default='2', type=int, help='Number of cores to assign to each executor')
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
    argparser_spark_cluster_create.set_defaults(func=spark_cluster_new_cmd)

    argparser_spark_nb_create = subparser.add_parser('app-spark-notebook-new', help="Setup a new Spark Notebook application")
    argparser_spark_nb_create.add_argument('--user-id', type=int, required=True, help='Notebook owner')
    argparser_spark_nb_create.add_argument('--name', required=True, help='Notebook name')
    argparser_spark_nb_create.add_argument('--worker-count', type=int, default=2, help='Number of workers')
    argparser_spark_nb_create.add_argument('--executor-memory', default='2g', help='Maximum memory available per-worker, the system assumes only one executor per worker')
    argparser_spark_nb_create.add_argument('--executor-cores', default='2', type=int, help='Number of cores to assign to each executor')
    argparser_spark_nb_create.set_defaults(func=spark_notebook_new_cmd)

    argparser_spark_submit_create = subparser.add_parser('app-spark-new', help="Setup a new Spark submit application")
    argparser_spark_submit_create.add_argument('--user-id', type=int, required=True, help='Application owner')
    argparser_spark_submit_create.add_argument('--name', required=True, help='Application name')
    argparser_spark_submit_create.add_argument('--worker-count', type=int, default=2, help='Number of workers')
    argparser_spark_submit_create.add_argument('--executor-memory', default='2g', help='Maximum memory available per-worker, the system assumes only one executor per worker')
    argparser_spark_submit_create.add_argument('--executor-cores', default='2', type=int, help='Number of cores to assign to each executor')
    argparser_spark_submit_create.add_argument('--file', required=True, help='zip archive containing the application files')
    argparser_spark_submit_create.set_defaults(func=spark_submit_new_cmd)
162 163 164

    argparser_app_rm = subparser.add_parser('app-rm', help="Delete an application")
    argparser_app_rm.add_argument('id', type=int, help="Application id")
165
    argparser_app_rm.add_argument('-f', '--force', action="store_true", help="Kill also all active executions, if any")
166 167 168 169 170 171
    argparser_app_rm.set_defaults(func=app_rm_cmd)

    argparser_app_inspect = subparser.add_parser('app-inspect', help="Gather details about an application and its active executions")
    argparser_app_inspect.add_argument('id', type=int, help="Application id")
    argparser_app_inspect.set_defaults(func=app_inspect_cmd)

172 173 174 175 176
    argparser_app_inspect = subparser.add_parser('app-ls', help="List all applications for a user")
    argparser_app_inspect.add_argument('id', type=int, help="User id")
    argparser_app_inspect.set_defaults(func=app_list_cmd)

    argparser_spark_app_run = subparser.add_parser('run', help="Execute a previously registered Spark application")
177 178
    argparser_spark_app_run.add_argument('id', type=int, help="Application id")
    argparser_spark_app_run.add_argument('--name', required=True, help='Execution name')
179
    argparser_spark_app_run.add_argument('--cmd', help="Command-line to pass to spark-submit")
180
    argparser_spark_app_run.add_argument('--spark-opts', help="Optional Spark options to pass to spark-submit")
181
    argparser_spark_app_run.set_defaults(func=run_spark_cmd)
182 183 184 185

    argparser_execution_kill = subparser.add_parser('execution-kill', help="Terminates an execution")
    argparser_execution_kill.add_argument('id', type=int, help="Execution id")
    argparser_execution_kill.set_defaults(func=exec_kill_cmd)
186

187 188 189 190
    argparser_log_get = subparser.add_parser('log-get', help="Retrieves the logs of a running container")
    argparser_log_get.add_argument('id', type=int, help="Container id")
    argparser_log_get.set_defaults(func=log_get_cmd)

191 192 193 194 195 196 197 198 199 200 201 202 203 204
    return argparser.parse_args()


def main():
    args = process_arguments()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    args.func(args)


main()