Commit ac955cac authored by Daniele Venzano's avatar Daniele Venzano

Use Swarm statistics to inform the FIFO policy about available resources and...

Use Swarm statistics to inform the FIFO policy about available resources and append to the scheduling queue executions that fail to start.
parent b662812e
......@@ -16,6 +16,7 @@
import time
import logging
import humanfriendly
import requests.exceptions
try:
......@@ -86,9 +87,8 @@ class SwarmClient:
ns = SwarmNodeStats(info["DriverStatus"][idx + node][0])
ns.docker_endpoint = info["DriverStatus"][idx + node][1]
idx2 += 1
if 'Status' in info["DriverStatus"][idx + node + idx2][0]: # new docker version
ns.status = info["DriverStatus"][idx + node + idx2][1]
idx2 += 1
ns.status = info["DriverStatus"][idx + node + idx2][1]
idx2 += 1
ns.container_count = int(info["DriverStatus"][idx + node + idx2][1])
idx2 += 1
ns.cores_reserved = int(info["DriverStatus"][idx + node + idx2][1].split(' / ')[0])
......@@ -103,6 +103,9 @@ class SwarmClient:
idx2 += 1
ns.last_update = info["DriverStatus"][idx + node + idx2][1]
ns.memory_reserved = humanfriendly.parse_size(ns.memory_reserved)
ns.memory_total = humanfriendly.parse_size(ns.memory_total)
pl_status.nodes.append(ns)
idx += idx2
pl_status.timestamp = time.time()
......
......@@ -16,6 +16,7 @@
import logging
import queue
from zoe_lib.exceptions import ZoeException
from zoe_scheduler.state.application import Application
from zoe_scheduler.state.execution import Execution
from zoe_scheduler.scheduler_policies.base import BaseSchedulerPolicy
......@@ -77,4 +78,9 @@ class ZoeScheduler:
if execution is None:
return
log.debug("Found a runnable execution!")
self.platform.execution_start(execution)
try:
self.platform.execution_start(execution)
except ZoeException:
self.scheduler_policy.start_failed(execution)
else:
self.scheduler_policy.start_successful(execution)
......@@ -55,6 +55,20 @@ class BaseSchedulerPolicy:
"""
raise NotImplementedError
def start_successful(self, execution: Execution) -> None:
"""
Update the internal data structures to acknowledge the fact that an execution has been succesfully started.
:param execution: the execution that was successfully started
:return: None
"""
def start_failed(self, execution: Execution) -> None:
"""
The execution could not be started for a transient error and its startup should be retried again later.
:param execution: The execution that failed to start
:return: None
"""
def stats(self) -> SchedulerStats:
"""
Gather statistics about the scheduler policy
......
......@@ -19,16 +19,17 @@ from zoe_scheduler.stats import SchedulerStats
from zoe_scheduler.scheduler_policies.base import BaseSchedulerPolicy
from zoe_scheduler.state.application import Application
from zoe_scheduler.state.execution import Execution
from zoe_scheduler.config import singletons
class FIFOSchedulerPolicy(BaseSchedulerPolicy):
def __init__(self, platform):
super().__init__(platform)
self.waiting_list = []
self.running_list = []
self.starting_list = []
def admission_control(self, app: Application) -> bool:
swarm_stats = self.platform.swarm_stats()
swarm_stats = singletons['stats_manager'].swarm_stats
if app.total_memory() < swarm_stats.memory_total:
return True
else:
......@@ -38,22 +39,36 @@ class FIFOSchedulerPolicy(BaseSchedulerPolicy):
self.waiting_list.append(execution)
def execution_kill(self, execution):
for e in self.waiting_list:
if e == execution:
self.waiting_list.remove(e)
return
for e in self.running_list:
if e[0] == execution:
self.running_list.remove(e)
return
try:
self.waiting_list.remove(execution)
except ValueError:
pass
def runnable_get(self) -> Execution:
try:
execution = self.waiting_list.pop(0)
except IndexError:
self.starting_list.remove(execution)
except ValueError:
pass
def runnable_get(self) -> Execution:
if len(self.waiting_list) == 0:
return None
return execution
mem_reserved = sum([node.memory_reserved for node in singletons['stats_manager'].swarm_stats.nodes])
mem_available = singletons['stats_manager'].swarm_stats.memory_total - mem_reserved
candidate = self.waiting_list[0]
if mem_available >= candidate.application.total_memory():
self.waiting_list.pop(0)
self.starting_list.append(candidate)
return candidate
else:
return None
def start_successful(self, execution: Execution) -> None:
self.starting_list.remove(execution)
def start_failed(self, execution: Execution) -> None:
self.starting_list.remove(execution)
self.waiting_list.append(execution) # append or insert(0, ...) ?
def stats(self):
ret = SchedulerStats()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment