Commit 7ab7368b authored by Daniele Venzano's avatar Daniele Venzano

Implement core and memory quotas

Implement run time limit quota
parent 447992fb
......@@ -15,6 +15,7 @@
"""The real API, exposed as web pages or REST API."""
from datetime import timedelta, datetime
import logging
import os
from typing import List
......@@ -70,7 +71,7 @@ class APIEndpoint:
except zoe_lib.exceptions.InvalidApplicationDescription as e:
raise zoe_api.exceptions.ZoeRestAPIException('Invalid application description: {}'.format(e.message), status_code=400)
def _check_quota(self, user: zoe_lib.state.User, application_description): # pylint: disable=unused-argument
def _check_quota(self, user: zoe_lib.state.User, application_description):
"""Check quota for given user and execution."""
quota = self.sql.quota.select(only_one=True, **{'id': user.quota_id})
......@@ -79,10 +80,29 @@ class APIEndpoint:
running_execs += self.sql.executions.select(**{'status': 'scheduled', 'user_id': user.id})
running_execs += self.sql.executions.select(**{'status': 'image download', 'user_id': user.id})
running_execs += self.sql.executions.select(**{'status': 'submitted', 'user_id': user.id})
if len(running_execs) >= quota.concurrent_executions:
if quota.concurrent_executions != 0 and len(running_execs) >= quota.concurrent_executions:
raise zoe_api.exceptions.ZoeQuotaException('You cannot run more than {} executions at a time, quota exceeded.'.format(quota.concurrent_executions))
# TODO: implement core and memory quotas
if quota.cores != 0 and quota.memory != 0:
return
reserved_cores = 0
reserved_mem = 0
for e in running_execs:
for s in e.services:
reserved_cores += s.resource_reservation.cores.min
reserved_mem += s.resource_reservation.memory.min
new_exec_cores = 0
new_exec_memory = 0
for s in application_description['services']:
new_exec_cores += s['resources']['cores']['min'] * s['total_count']
new_exec_memory += s['resources']['memory']['min'] * s['total_count']
if quota.cores < reserved_cores + new_exec_cores:
raise zoe_api.exceptions.ZoeQuotaException('You requested {} cores more than your quota allows, quota exceeded.'.format((reserved_cores + new_exec_cores) - quota.cores))
if quota.memory < reserved_mem + new_exec_memory:
raise zoe_api.exceptions.ZoeQuotaException('You requested {}B memory more than your quota allows, quota exceeded.'.format((reserved_mem + new_exec_memory) - quota.memory))
def execution_start(self, user: zoe_lib.state.User, exec_name, application_description):
"""Start an execution."""
......@@ -231,7 +251,7 @@ class APIEndpoint:
users = self.sql.user.select(**filters)
return users
def user_new(self, user: zoe_lib.state.User, username: str, email: str, role_id: int, quota_id: int, auth_source: str, fs_uid: int) -> int:
def user_new(self, user: zoe_lib.state.User, username: str, email: str, role_id: int, quota_id: int, auth_source: str, fs_uid: int) -> int: # pylint: disable=too-many-arguments
"""Creates a new user."""
if not user.role.can_change_config:
raise zoe_api.exceptions.ZoeAuthException()
......@@ -350,7 +370,7 @@ class APIEndpoint:
if not user.role.can_change_config:
raise zoe_api.exceptions.ZoeAuthException()
role_id = self.sql.quota.insert(quota_data['name'], quota_data['concurrent_executions'], quota_data['memory'], quota_data['cores'])
role_id = self.sql.quota.insert(quota_data['name'], quota_data['concurrent_executions'], quota_data['memory'], quota_data['cores'], quota_data['runtime_limit'])
return role_id
def quota_list(self, user: zoe_lib.state.User, **filters) -> List[zoe_lib.state.Quota]:
......@@ -383,3 +403,21 @@ class APIEndpoint:
raise zoe_api.exceptions.ZoeRestAPIException('Cannot rename default quota')
self.sql.quota.update(quota_id, **quota_data)
def verify_runtime_limit(self):
"""Scan the active executions and kill all those that exceed the runtime_limit quota."""
running_execs = self.sql.executions.select(**{'status': 'running'})
running_execs += self.sql.executions.select(**{'status': 'starting'})
running_execs += self.sql.executions.select(**{'status': 'scheduled'})
running_execs += self.sql.executions.select(**{'status': 'image download'})
running_execs += self.sql.executions.select(**{'status': 'submitted'})
for e in running_execs:
runtime_limit = e.owner.quota.runtime_limit
if runtime_limit == 0:
continue
runtime_limit = timedelta(hours=runtime_limit)
if e.time_submit + runtime_limit > datetime.utcnow():
log.info('Automatically terminating execution {} that has exceeded the run time limit'.format(e.id))
self.execution_terminate(e.owner, e.id)
......@@ -19,7 +19,7 @@ import logging
import os
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import Application
import zoe_lib.config
......@@ -75,6 +75,9 @@ def zoe_web_main(test_conf=None) -> int:
http_server.bind(args.listen_port, args.listen_address)
http_server.start(num_processes=1)
pc = PeriodicCallback(api_endpoint.verify_runtime_limit, 300000)
pc.start()
try:
IOLoop.current().start()
except KeyboardInterrupt:
......
......@@ -436,9 +436,10 @@ def process_arguments() -> Tuple[ArgumentParser, Namespace]:
sub_parser = subparser.add_parser('quota-create', help="Create a new quota")
sub_parser.add_argument('name', help="Quota name")
sub_parser.add_argument('concurrent_executions', type=int, help="Maximum number of concurrent executions")
sub_parser.add_argument('memory', type=int, help="Maximum memory in bytes across all running executions")
sub_parser.add_argument('cores', type=int, help="Maximum number of cores across all running executions")
sub_parser.add_argument('concurrent_executions', type=int, help="Maximum number of concurrent executions (0 means no limit)")
sub_parser.add_argument('memory', type=int, help="Maximum memory in bytes across all running executions (0 means no limit)")
sub_parser.add_argument('cores', type=int, help="Maximum number of cores across all running executions (0 means no limit)")
sub_parser.add_argument('runtime_limit', type=int, help="Maximum number of hours an execution is allowed to run (0 means no limit)")
sub_parser.set_defaults(func=quota_create_cmd)
sub_parser = subparser.add_parser('quota-delete', help="Delete a quota")
......@@ -448,9 +449,10 @@ def process_arguments() -> Tuple[ArgumentParser, Namespace]:
sub_parser = subparser.add_parser('quota-update', help="Update an existing quota")
sub_parser.add_argument('id', type=int, help="ID of the quota to update")
sub_parser.add_argument('--name', help="Quota name")
sub_parser.add_argument('--concurrent_executions', type=int, help="Maximum number of concurrent executions")
sub_parser.add_argument('--memory', type=int, help="Maximum memory in bytes across all running executions")
sub_parser.add_argument('--cores', type=int, help="Maximum number of cores across all running executions")
sub_parser.add_argument('--concurrent_executions', type=int, help="Maximum number of concurrent executions (0 means no limit)")
sub_parser.add_argument('--memory', type=int, help="Maximum memory in bytes across all running executions (0 means no limit)")
sub_parser.add_argument('--cores', type=int, help="Maximum number of cores across all running executions (0 means no limit)")
sub_parser.add_argument('--runtime_limit', type=int, help="Maximum number of hours an execution is allowed to run (0 means no limit)")
sub_parser.set_defaults(func=quota_update_cmd)
# Roles
......
......@@ -32,6 +32,7 @@ class Quota(BaseRecord):
self.concurrent_executions = d['concurrent_executions']
self.memory = d['memory']
self.cores = d['cores']
self.runtime_limit = d['runtime_limit']
def serialize(self):
"""Generates a dictionary that can be serialized in JSON."""
......@@ -40,7 +41,8 @@ class Quota(BaseRecord):
'name': self.name,
'concurrent_executions': self.concurrent_executions,
'cores': self.cores,
'memory': self.memory
'memory': self.memory,
'runtime_limit': self.runtime_limit
}
def set_concurrent_executions(self, value):
......@@ -58,6 +60,11 @@ class Quota(BaseRecord):
self.cores = value
self.sql_manager.quota_update(self.id, cores=value)
def set_runtime_limit(self, value):
"""Setter for the runtime limit."""
self.runtime_limit = value
self.sql_manager.quota_update(self.id, runtime_limit=value)
class QuotaTable(BaseTable):
"""Abstraction for the quota table in the database."""
......@@ -71,9 +78,10 @@ class QuotaTable(BaseTable):
name TEXT NOT NULL,
concurrent_executions INT NOT NULL,
memory BIGINT NOT NULL,
cores INT NOT NULL
cores INT NOT NULL,
runtime_limit INT NOT NULL
)''')
self.cursor.execute('''INSERT INTO quota (id, name, concurrent_executions, memory, cores) VALUES (DEFAULT, 'default', 5, 34359738368, 20)''')
self.cursor.execute('''INSERT INTO quota (id, name, concurrent_executions, memory, cores, runtime_limit) VALUES (DEFAULT, 'default', 5, 34359738368, 20, 24)''')
def select(self, only_one=False, **kwargs):
"""
......@@ -106,9 +114,9 @@ class QuotaTable(BaseTable):
else:
return [Quota(x, self.sql_manager) for x in self.cursor]
def insert(self, name, concurrent_executions, memory, cores):
def insert(self, name, concurrent_executions, memory, cores, runtime_limit):
"""Adds a new quota to the state."""
query = self.cursor.mogrify('INSERT INTO quota (id, name, concurrent_executions, memory, cores) VALUES (DEFAULT, %s, %s, %s, %s) RETURNING id', (name, concurrent_executions, memory, cores))
query = self.cursor.mogrify('INSERT INTO quota (id, name, concurrent_executions, memory, cores, runtime_limit) VALUES (DEFAULT, %s, %s, %s, %s, %s) RETURNING id', (name, concurrent_executions, memory, cores, runtime_limit))
self.cursor.execute(query)
self.sql_manager.commit()
return self.cursor.fetchone()[0]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment