Commit 17ae15ce authored by Daniele Venzano's avatar Daniele Venzano

Move application descriptions into executions

Users directly submit executions passing an application description instead of having to define an application as a separate step.
parent bd1ac8cd
......@@ -25,16 +25,15 @@ from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpForm
from pprint import pprint
from zoe_cmd import utils
from zoe_lib.applications import ZoeApplicationAPI
from zoe_lib.containers import ZoeContainerAPI
from zoe_lib.exceptions import ZoeAPIException
from zoe_lib.executions import ZoeExecutionsAPI
from zoe_lib.predefined_apps import hadoop, spark, lab_spark, test_sleep
from zoe_lib.predefined_apps import hadoop, spark, lab_spark, test_sleep, copier
from zoe_lib.query import ZoeQueryAPI
from zoe_lib.users import ZoeUserAPI
PREDEFINED_APPS = {}
for mod in [hadoop, spark, lab_spark, test_sleep]:
for mod in [hadoop, spark, lab_spark, test_sleep, copier]:
for app_name, val in mod.__dict__.items():
if callable(val) and "_app" in app_name:
PREDEFINED_APPS[app_name[:-4]] = val
......@@ -109,41 +108,14 @@ def pre_app_export_cmd(args):
print()
def app_new_cmd(args):
app_descr = json.load(args.jsonfile)
api = ZoeApplicationAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
try:
app_id = api.create(app_descr)
except ZoeAPIException as e:
print("Invalid application description: %s" % e.message)
return
app = api.get(app_id)
print("Application {} added with ID: {}".format(app['name'], app_id))
def app_get_cmd(args):
api_query = ZoeQueryAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
data = api_query.query('application', name=args.name)
data = api_query.query('execution', name=args.name)
if len(data) == 0:
print("no such application")
print("no such execution")
else:
for app in data:
pprint(app)
def app_rm_cmd(args):
api_app = ZoeApplicationAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
print('Deleting app {}'.format(args.app_id))
api_app.delete(args.app_id)
def app_list_cmd(_):
api_query = ZoeQueryAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
api_user = ZoeUserAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
data = api_query.query('application')
for app in data:
user = api_user.get(app['owner'])
print('{} (User: {}, ID: {})'.format(app['name'], user['name'], app['id']))
execution = data[0]
json.dump(execution['application'], sys.stdout, sort_keys=True, indent=4)
def exec_list_cmd(_):
......@@ -156,13 +128,13 @@ def exec_list_cmd(_):
def exec_start_cmd(args):
app_descr = json.load(args.jsonfile)
exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
ret = exec_api.execution_start(args.name, args.app_name)
ret = exec_api.execution_start(args.name, app_descr)
print("Application scheduled successfully with ID {}, use the exec-get command to check its status".format(ret))
def exec_get_cmd(args):
app_api = ZoeApplicationAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
exec_api = ZoeExecutionsAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
cont_api = ZoeContainerAPI(utils.zoe_url(), utils.zoe_user(), utils.zoe_pass())
execution = exec_api.execution_get(args.id)
......@@ -174,7 +146,7 @@ def exec_get_cmd(args):
print('Time started: {}'.format(execution['time_started']))
print('Time scheduled: {}'.format(execution['time_scheduled']))
print('Time finished: {}'.format(execution['time_finished']))
app = app_api.get(execution['application_id'])
app = execution['application']
print('Application name: {}'.format(app['name']))
for c_id in execution['containers']:
c = cont_api.get(c_id)
......@@ -246,32 +218,21 @@ def process_arguments() -> Namespace:
argparser_pre_app_export.add_argument('app_name', help='Predefined application name (use pre-app-list to see what is available')
argparser_pre_app_export.set_defaults(func=pre_app_export_cmd)
argparser_app_new = subparser.add_parser('app-new', help="Upload a JSON application description")
argparser_app_new.add_argument('jsonfile', type=FileType("r"), help='Application description')
argparser_app_new.set_defaults(func=app_new_cmd)
argparser_app_get = subparser.add_parser('app-get', help="Retrieve an already defined application description")
argparser_app_get.add_argument('name', help='The name of the application')
argparser_app_get.set_defaults(func=app_get_cmd)
argparser_app_rm = subparser.add_parser('app-rm', help="Delete an application")
argparser_app_rm.add_argument('app_id', help="Application ID (will fail if there are running executions)")
argparser_app_rm.set_defaults(func=app_rm_cmd)
argparser_app_list = subparser.add_parser('app-ls', help="List all applications defined by the calling user")
argparser_app_list.set_defaults(func=app_list_cmd)
argparser_exec_start = subparser.add_parser('start', help="Start a previously registered application")
argparser_exec_start.add_argument('app_name', help="Name of the application to start")
argparser_exec_start.add_argument('name', help="Name of the execution")
argparser_exec_start.add_argument('jsonfile', type=FileType("r"), help='Application description')
argparser_exec_start.set_defaults(func=exec_start_cmd)
argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
argparser_app_list.set_defaults(func=exec_list_cmd)
argparser_execution_get = subparser.add_parser('exec-get', help="Get execution status")
argparser_execution_get.add_argument('id', type=int, help="Execution id")
argparser_execution_get.set_defaults(func=exec_get_cmd)
argparser_app_list = subparser.add_parser('exec-ls', help="List all executions for the calling user")
argparser_app_list.set_defaults(func=exec_list_cmd)
argparser_app_get = subparser.add_parser('exec-app-get', help="Retrieve an already defined application description")
argparser_app_get.add_argument('name', help='The name of the application')
argparser_app_get.set_defaults(func=app_get_cmd)
argparser_execution_kill = subparser.add_parser('terminate', help="Terminates an execution")
argparser_execution_kill.add_argument('id', type=int, help="Execution id")
......
......@@ -14,146 +14,114 @@
# limitations under the License.
"""
This module all application-related API calls for Zoe clients.
This module contains code to validate application descriptions.
"""
import logging
from zoe_lib import ZoeAPIBase
from zoe_lib.exceptions import ZoeAPIException, InvalidApplicationDescription
from zoe_lib.exceptions import InvalidApplicationDescription
log = logging.getLogger(__name__)
class ZoeApplicationAPI(ZoeAPIBase):
"""
The application API.
"""
def get(self, application_id: int) -> dict:
"""
Return an Application object
:param application_id: the identifier of the application
:return: the application dict
"""
data, status_code = self._rest_get('/application/' + str(application_id))
if status_code == 200:
return data
else:
raise ZoeAPIException(data['message'])
def create(self, description: dict) -> int:
"""
Create a new application and commit it to the database.
:param description: the application description
:return: the new application ID
"""
self._app_check(description)
data, status_code = self._rest_post('/application', description)
if status_code != 201:
raise ZoeAPIException(data['message'])
return data['application_id']
def delete(self, application_id: int):
"""
If the application does not exists an error will be logged.
:param application_id: the application to delete
"""
data, status_code = self._rest_delete('/application/' + str(application_id))
if status_code != 204:
raise ZoeAPIException(data['message'])
def _app_check(self, data):
required_keys = ['name', 'will_end', 'priority', 'requires_binary', 'version']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
int(data["version"])
except ValueError:
raise InvalidApplicationDescription(msg="version field should be an int")
try:
bool(data['will_end'])
except ValueError:
raise InvalidApplicationDescription(msg="will_end field must be a boolean")
try:
bool(data['requires_binary'])
except ValueError:
raise InvalidApplicationDescription(msg="requires_binary field must be a boolean")
try:
priority = int(data['priority'])
except ValueError:
raise InvalidApplicationDescription(msg="priority field must be an int")
if priority < 0 or priority > 1024:
raise InvalidApplicationDescription(msg="priority must be between 0 and 1024")
for p in data['processes']:
self._process_check(p)
found_monitor = False
for p in data['processes']:
if p['monitor']:
found_monitor = True
break
if not found_monitor:
raise InvalidApplicationDescription(msg="at least one process should have monitor set to True")
def _process_check(self, data):
required_keys = ['name', 'docker_image', 'monitor', 'ports', 'required_resources']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
bool(data['monitor'])
except ValueError:
raise InvalidApplicationDescription(msg="monitor field should be a boolean")
if not hasattr(data['ports'], '__iter__'):
raise InvalidApplicationDescription(msg='ports should be a list')
for pp in data['ports']:
self._port_check(pp)
if not isinstance(data['required_resources'], dict):
raise InvalidApplicationDescription(msg="required_resources should be a dictionary")
if 'memory' not in data['required_resources']:
raise InvalidApplicationDescription(msg="Missing required key: required_resources -> memory")
try:
int(data['required_resources']['memory'])
except ValueError:
raise InvalidApplicationDescription(msg="required_resources -> memory field should be an int")
if 'environment' in data:
if not hasattr(data['environment'], '__iter__'):
raise InvalidApplicationDescription(msg='environment should be an iterable')
for e in data['environment']:
if len(e) != 2:
raise InvalidApplicationDescription(msg='environment variable should have a name and a value')
if not isinstance(e[0], str):
raise InvalidApplicationDescription(msg='environment variable names must be strings: {}'.format(e[0]))
if not isinstance(e[1], str):
raise InvalidApplicationDescription(msg='environment variable values must be strings: {}'.format(e[1]))
def _port_check(self, data):
required_keys = ['name', 'protocol', 'port_number', 'is_main_endpoint']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
int(data['port_number'])
except ValueError:
raise InvalidApplicationDescription(msg="port_number field should be an integer")
try:
bool(data['is_main_endpoint'])
except ValueError:
raise InvalidApplicationDescription(msg="is_main_endpoint field should be a boolean")
def app_validate(data):
required_keys = ['name', 'will_end', 'priority', 'requires_binary', 'version']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
int(data["version"])
except ValueError:
raise InvalidApplicationDescription(msg="version field should be an int")
try:
bool(data['will_end'])
except ValueError:
raise InvalidApplicationDescription(msg="will_end field must be a boolean")
try:
bool(data['requires_binary'])
except ValueError:
raise InvalidApplicationDescription(msg="requires_binary field must be a boolean")
try:
priority = int(data['priority'])
except ValueError:
raise InvalidApplicationDescription(msg="priority field must be an int")
if priority < 0 or priority > 1024:
raise InvalidApplicationDescription(msg="priority must be between 0 and 1024")
for p in data['processes']:
_process_check(p)
found_monitor = False
for p in data['processes']:
if p['monitor']:
found_monitor = True
break
if not found_monitor:
raise InvalidApplicationDescription(msg="at least one process should have monitor set to True")
def _process_check(data):
required_keys = ['name', 'docker_image', 'monitor', 'ports', 'required_resources']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
bool(data['monitor'])
except ValueError:
raise InvalidApplicationDescription(msg="monitor field should be a boolean")
if not hasattr(data['ports'], '__iter__'):
raise InvalidApplicationDescription(msg='ports should be a list')
for pp in data['ports']:
_port_check(pp)
if not isinstance(data['required_resources'], dict):
raise InvalidApplicationDescription(msg="required_resources should be a dictionary")
if 'memory' not in data['required_resources']:
raise InvalidApplicationDescription(msg="Missing required key: required_resources -> memory")
try:
int(data['required_resources']['memory'])
except ValueError:
raise InvalidApplicationDescription(msg="required_resources -> memory field should be an int")
if 'environment' in data:
if not hasattr(data['environment'], '__iter__'):
raise InvalidApplicationDescription(msg='environment should be an iterable')
for e in data['environment']:
if len(e) != 2:
raise InvalidApplicationDescription(msg='environment variable should have a name and a value')
if not isinstance(e[0], str):
raise InvalidApplicationDescription(msg='environment variable names must be strings: {}'.format(e[0]))
if not isinstance(e[1], str):
raise InvalidApplicationDescription(msg='environment variable values must be strings: {}'.format(e[1]))
if 'volumes' in data:
if not hasattr(data['volumes'], '__iter__'):
raise InvalidApplicationDescription(msg='volumes should be an iterable')
for v in data['volumes']:
if len(v) != 3:
raise InvalidApplicationDescription(msg='volume description should have three components')
if not isinstance(v[2], bool):
raise InvalidApplicationDescription(msg='readonly volume item (third) must be a boolean: {}'.format(v[2]))
def _port_check(data):
required_keys = ['name', 'protocol', 'port_number', 'is_main_endpoint']
for k in required_keys:
if k not in data:
raise InvalidApplicationDescription(msg="Missing required key: %s" % k)
try:
int(data['port_number'])
except ValueError:
raise InvalidApplicationDescription(msg="port_number field should be an integer")
try:
bool(data['is_main_endpoint'])
except ValueError:
raise InvalidApplicationDescription(msg="is_main_endpoint field should be a boolean")
......@@ -68,21 +68,16 @@ class ZoeExecutionsAPI(ZoeAPIBase):
else:
return None
def execution_start(self, name: str, application_name: str) -> int:
def execution_start(self, name: str, application_description: dict) -> int:
"""
Submit an application to the scheduler to start a new execution.
:param name: user-provided name of the execution
:param application_name: the application to start
:param application_description: the application to start
:return: the new Execution object, or None in case of error
"""
api_query = ZoeQueryAPI(self.url, self.user, self.password)
data = api_query.query('application', name=application_name)
if len(data) == 0:
raise ZoeAPIException('No such application')
app = data[0]
execution = {
"application_id": app['id'],
"application": application_description,
'name': name
}
data, status_code = self._rest_post('/execution', execution)
......
......@@ -18,7 +18,6 @@ from flask import Flask
from flask_restful import Api
from zoe_scheduler.rest_api.user import UserAPI, UserCollectionAPI
from zoe_scheduler.rest_api.application import ApplicationAPI, ApplicationCollectionAPI
from zoe_scheduler.rest_api.execution import ExecutionAPI, ExecutionCollectionAPI
from zoe_scheduler.rest_api.container import ContainerAPI
from zoe_scheduler.rest_api.query import QueryAPI
......@@ -38,8 +37,6 @@ def init(state, platform) -> Flask:
api.add_resource(UserAPI, API_PATH + '/user/<int:user_id>', resource_class_kwargs=args)
api.add_resource(UserCollectionAPI, API_PATH + '/user', resource_class_kwargs=args)
api.add_resource(ApplicationAPI, API_PATH + '/application/<int:application_id>', resource_class_kwargs=args)
api.add_resource(ApplicationCollectionAPI, API_PATH + '/application', resource_class_kwargs=args)
api.add_resource(ExecutionAPI, API_PATH + '/execution/<int:execution_id>', resource_class_kwargs=args)
api.add_resource(ExecutionCollectionAPI, API_PATH + '/execution', resource_class_kwargs=args)
api.add_resource(ContainerAPI, API_PATH + '/container/<int:container_id>', resource_class_kwargs=args)
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from werkzeug.exceptions import BadRequest
from flask_restful import Resource, request
from zoe_lib.exceptions import ZoeException, ZoeRestAPIException
from zoe_scheduler.config import singletons
from zoe_scheduler.state.manager import StateManager
from zoe_scheduler.platform_manager import PlatformManager
from zoe_scheduler.rest_api.utils import catch_exceptions
from zoe_scheduler.rest_api.auth.authentication import authenticate
from zoe_scheduler.rest_api.auth.authorization import is_authorized
from zoe_scheduler.state.application import Application
class ApplicationAPI(Resource):
"""
:type state: StateManager
:type platform: PlatformManager
"""
def __init__(self, **kwargs):
self.state = kwargs['state']
self.platform = kwargs['platform']
@catch_exceptions
def get(self, application_id: int):
start = time.time()
calling_user = authenticate(request, self.state)
app = self.state.get_one('application', id=application_id)
if app is None:
raise ZoeRestAPIException('No such application', 404)
is_authorized(calling_user, app, 'get')
ret = app.to_dict(checkpoint=False)
singletons['metric'].metric_api_call(start, 'application', 'get', calling_user)
return ret
@catch_exceptions
def delete(self, application_id: int):
start = time.time()
calling_user = authenticate(request, self.state)
app = self.state.get_one('application', id=application_id)
if app is None:
return
assert isinstance(app, Application)
is_authorized(calling_user, app, 'delete')
if self.state.app_has_active_executions(app.id):
raise ZoeRestAPIException('Application has active executions, cannot delete')
for e in app.executions:
self.state.delete('execution', e.id)
self.state.delete('application', app.id)
self.state.state_updated()
singletons['metric'].metric_api_call(start, 'application', 'delete', calling_user)
return '', 204
class ApplicationCollectionAPI(Resource):
"""
:type state: StateManager
:type platform: PlatformManager
"""
def __init__(self, **kwargs):
self.state = kwargs['state']
self.platform = kwargs['platform']
@catch_exceptions
def post(self):
start = time.time()
calling_user = authenticate(request, self.state)
try:
data = request.get_json()
except BadRequest:
raise ZoeRestAPIException('Error decoding JSON data')
app = Application(self.state)
data['user_id'] = calling_user.id
try:
app.from_dict(data, checkpoint=False)
except ZoeException as e:
raise ZoeRestAPIException(str(e))
is_authorized(calling_user, app, 'create')
app.id = self.state.gen_id()
self.state.new('application', app)
self.state.state_updated()
singletons['metric'].metric_api_call(start, 'application', 'post', calling_user)
return {'application_id': app.id}, 201
......@@ -68,7 +68,7 @@ class ExecutionAPI(Resource):
is_authorized(calling_user, e, 'delete')
if e.status == "running" or e.status == "scheduled":
if e.is_active():
self.platform.execution_terminate(e, reason='terminated')
self.state.state_updated()
......@@ -110,7 +110,7 @@ class ExecutionCollectionAPI(Resource):
@catch_exceptions
def post(self):
"""
Starts an execution, given an application_id. Takes a JSON object like this: { "application_id": 4 }
Starts an execution, given an application description. Takes a JSON object.
:return: the new execution_id
"""
start_time = time.time()
......@@ -122,14 +122,12 @@ class ExecutionCollectionAPI(Resource):
raise ZoeRestAPIException('Error decoding JSON data')
execution = Execution(self.state)
data['user_id'] = calling_user.id
try:
execution.from_dict(data, checkpoint=False)
except ZoeException as e:
raise ZoeRestAPIException(e.value)
# if not zoe_sched_singleton.validate(execution.application):
# return error('admission control refused this application description')
is_authorized(calling_user, execution, 'create')
check_quota(calling_user, self.state)
......
......@@ -14,19 +14,10 @@
# limitations under the License.
from zoe_lib.exceptions import InvalidApplicationDescription
from zoe_scheduler.state.base import BaseState
class Application(BaseState):
api_out_attrs = ['name', 'version', 'will_end', 'priority', 'requires_binary']
api_in_attrs = ['name', 'version', 'will_end', 'priority', 'requires_binary']
def __init__(self, state):
super().__init__(state)
self.user = None
self.executions = []
class Application:
def __init__(self):
self.name = ''
self.version = 0
......@@ -35,38 +26,41 @@ class Application(BaseState):
self.requires_binary = False
self.processes = []
def to_dict(self, checkpoint):
d = super().to_dict(checkpoint)
d['processes'] = []
def to_dict(self):
d = {
'name': self.name,
'version': self.version,
'will_end': self.will_end,
'priority': self.priority,
'requires_binary': self.requires_binary,
'processes': []
}
for p in self.processes:
d['processes'].append(p.to_dict())
if checkpoint:
d['user_id'] = self.user.id
return d
def from_dict(self, data, checkpoint):
super().from_dict(data, checkpoint)
def from_dict(self, data):
try:
self.version = int(self.version)
self.version = int(data['version'])
except ValueError:
raise InvalidApplicationDescription(msg="version field should be an int")
except KeyError:
raise InvalidApplicationDescription(msg="Missing required key: version")
try:
self.will_end = bool(self.will_end)
self.will_end = bool(data['will_end'])
except ValueError:
raise InvalidApplicationDescription(msg="will_end field must be a boolean")
try:
self.requires_binary = bool(self.requires_binary)
self.requires_binary = bool(data['requires_binary'])
except ValueError:
raise InvalidApplicationDescription(msg="requires_binary field must be a boolean")
try:
self.priority = int(self.priority)
self.priority = int(data['priority'])
except ValueError:
raise InvalidApplicationDescription("priority field must be an int")
if self.priority < 0 or self.priority > 1024:
......@@ -85,16 +79,6 @@ class Application(BaseState):
if not found_monitor:
raise InvalidApplicationDescription("at least one process should have monitor set to True")
user = self.state_manger.get_one('user', id=data['user_id'])
if user is None:
raise InvalidApplicationDescription('Deserialized application points to a non-existent user')
self.user = user
user.applications.append(self)
@property
def owner(self):
return self.user
def total_memory(self) -> int:
memory = 0
for p in self.processes:
......@@ -104,10 +88,6 @@ class Application(BaseState):
def container_count(self) -> int:
return len(self.processes)
def add_execution(self, execution):
execution.application = self
self.executions.append(execution)
class ProcessEndpoint:
def __init__(self):
......@@ -161,6 +141,7 @@ class Process:
self.ports = [] # A list of ProcessEndpoints
self.required_resources = {}
self.environment = [] # Environment variables to pass to Docker
self.volumes = [] # list of volumes to mount. Each volume is a three tuple: host path, container path, readonly boolean
self.command = None # Commandline to pass to the Docker container
def to_dict(self):
......@@ -171,7 +152,8 @@ class Process:
'required_resources': self.required_resources,
'environment': self.environment,
'command': self.command,
'ports': []
'ports': [],
'volumes': []
}
for p in self.ports:
d['ports'].append(p.to_dict())
......@@ -226,3 +208,13 @@ class Process:
if 'command' in data:
self.command = data['command']
if 'volumes' in data:
if not hasattr(data['volumes'], '__iter__'):
raise InvalidApplicationDescription(msg='volumes should be an iterable')
self.volumes = data['volumes'].copy()
for v in self.volumes:
if len(v) != 3:
raise InvalidApplicationDescription(msg='volume description should have three components')
if not isinstance(v[2], bool):
raise InvalidApplicationDescription(msg='readonly volume item (third) must be a boolean: {}'.format(v[2]))
......@@ -55,4 +55,4 @@ class Container(BaseState):
@property
def owner(self):
return self.execution.application.user
return self.execution.user
......@@ -19,8 +19,9 @@ from io import BytesIO
import dateutil.parser