Commit 188fe8be authored by Daniele Venzano's avatar Daniele Venzano

Implement the DYNSIZE scheduler policy that automatically calculates a size...

Implement the DYNSIZE scheduler policy that automatically calculates a size and dynamically makes it smaller when the zapp is waiting in the queue
parent b08d5cb7
......@@ -98,7 +98,7 @@ def load_configuration(test_conf=None):
# Scheduler
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE', 'DYNSIZE'], default='FIFO')
argparser.add_argument('--placement-policy', help='Placement policy', choices=['waterfill', 'random', 'average'], default='average')
argparser.add_argument('--backend', choices=['Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
......
......@@ -67,10 +67,13 @@ class Execution(BaseRecord):
self._status = d['status']
self.error_message = d['error_message']
try:
self.size = self.description['size']
except KeyError:
self.size = self.description['priority'] # zapp format v2
if d['size'] is not None:
self.size = float(d['size'])
else:
try:
self.size = self.description['size']
except KeyError:
self.size = self.description['priority'] # zapp format v2
self.termination_lock = threading.Lock()
......@@ -86,7 +89,8 @@ class Execution(BaseRecord):
'time_end': None if self.time_end is None else (self.time_end - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1),
'status': self._status,
'error_message': self.error_message,
'services': [s.id for s in self.services]
'services': [s.id for s in self.services],
'size': self.size
}
def __eq__(self, other):
......@@ -130,6 +134,11 @@ class Execution(BaseRecord):
self.error_message = message
self.sql_manager.executions.update(self.id, error_message=self.error_message)
def set_size(self, new_size):
"""Changes the size of the execution, for policies that calculate the size automatically."""
self.size = new_size
self.sql_manager.executions.update(self.id, size=new_size)
@property
def is_active(self):
"""
......@@ -225,6 +234,7 @@ class ExecutionTable(BaseTable):
user_id TEXT NOT NULL,
description JSON NOT NULL,
status TEXT NOT NULL,
size NUMERIC NOT NULL,
execution_manager_id TEXT NULL,
time_submit TIMESTAMP NOT NULL,
time_start TIMESTAMP NULL,
......@@ -236,7 +246,7 @@ class ExecutionTable(BaseTable):
"""Create a new execution in the state."""
status = Execution.SUBMIT_STATUS
time_submit = datetime.datetime.utcnow()
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, time_submit))
query = self.cursor.mogrify('INSERT INTO execution (id, name, user_id, description, status, size, time_submit) VALUES (DEFAULT, %s,%s,%s,%s,%s,%s) RETURNING id', (name, user_id, description, status, description['size'], time_submit))
self.cursor.execute(query)
self.sql_manager.commit()
return self.cursor.fetchone()[0]
......
......@@ -69,6 +69,9 @@ def _digest_application_description(state: SQLManager, execution: Execution):
counter += 1
assert counter == total_count
if get_conf().scheduler_policy == 'DYNSIZE':
execution.set_size(execution.total_reservations.cores.min * execution.total_reservations.memory.min)
return True
......
......@@ -61,7 +61,7 @@ class ExecutionProgress:
class ZoeElasticScheduler:
"""The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
def __init__(self, state: SQLManager, policy, metrics: StatsManager):
if policy != 'FIFO' and policy != 'SIZE':
if policy != 'FIFO' and policy != 'SIZE' and policy != 'DYNSIZE':
raise UnsupportedSchedulerPolicyError
self.metrics = metrics
self.trigger_semaphore = threading.Semaphore(0)
......@@ -148,16 +148,19 @@ class ZoeElasticScheduler:
counter -= 1
def _refresh_execution_sizes(self):
for execution in self.queue: # type: Execution
exec_data = self.additional_exec_state[execution.id]
if exec_data.last_time_scheduled == 0:
progress = 0
else:
last_progress = (time.time() - exec_data.last_time_scheduled) / ((execution.services_count / execution.running_services_count) * execution.size)
exec_data.progress_sequence.append(last_progress)
progress = sum(exec_data.progress_sequence)
remaining_execution_time = (1 - progress) * execution.size
execution.size = remaining_execution_time * execution.services_count
if self.policy == "FIFO":
return
elif self.policy == "SIZE":
return
elif self.policy == "DYNSIZE":
for execution in self.queue: # type: Execution
exec_data = self.additional_exec_state[execution.id]
if execution.size == 0 or exec_data.last_time_scheduled == 0:
continue
elif execution.size < 0:
execution.size = 0
new_size = execution.size - (time.time() - exec_data.last_time_scheduled) * (32 * 1024 ** 2) # to be tuned
execution.set_size(new_size)
def _pop_all(self):
out_list = []
......@@ -172,8 +175,9 @@ class ZoeElasticScheduler:
def _requeue(self, execution: Execution):
execution.termination_lock.release()
if execution not in self.queue: # make sure the execution is in the queue
log.warning("Execution {} wants to be re-queued, but it was not in the queue".format(execution.id))
self.additional_exec_state[execution.id].last_time_scheduled = time.time()
if execution not in self.queue: # sanity check: the execution should be in the queue
log.warning("Execution {} wants to be re-queued, but it is not in the queue".format(execution.id))
@catch_exceptions_and_retry
def loop_start_th(self): # pylint: disable=too-many-locals
......@@ -201,7 +205,7 @@ class ZoeElasticScheduler:
while True: # Inner loop will run until no new executions can be started or the queue is empty
self._refresh_execution_sizes()
if self.policy == "SIZE":
if self.policy == "SIZE" or self.policy == "DYNSIZE":
self.queue.sort(key=lambda execution: execution.size)
jobs_to_attempt_scheduling = self._pop_all()
......@@ -218,7 +222,6 @@ class ZoeElasticScheduler:
break
cluster_status_snapshot = SimulatedPlatform(platform_state)
log.debug(str(cluster_status_snapshot))
jobs_to_launch = []
free_resources = cluster_status_snapshot.aggregated_free_memory()
......@@ -274,7 +277,6 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
self.queue.remove(job)
self.queue_running.append(job)
self.additional_exec_state[job.id].last_time_scheduled = time.time()
self.core_limit_recalc_trigger.set()
......
......@@ -29,6 +29,7 @@ class SimulatedNode:
self.name = real_node.name
self.labels = real_node.labels
self.images = list_available_images(self.name)
log.debug('Node {}: m {} | c {} | l {}'.format(self.name, self.node_free_memory() / (1024 ** 2), self.node_free_cores(), list(self.labels)))
def service_fits(self, service: Service) -> bool:
"""Checks whether a service can fit in this node"""
......@@ -100,7 +101,7 @@ class SimulatedNode:
return free
def __repr__(self):
out = 'SN {} | m {} | c {}'.format(self.name, self.node_free_memory(), self.node_free_cores())
out = 'SN {} | m {} | c {}'.format(self.name, self.node_free_memory() / (1024 ** 2), self.node_free_cores())
return out
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment