Commit ec91058e authored by Daniele Venzano's avatar Daniele Venzano

Move core automatic limit setting to a per-service trigger

parent 524ef4d9
......@@ -56,6 +56,10 @@ class BaseBackend:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
def node_state(self, node_name: str, get_usage_stats: bool):
"""Get the state of a single node."""
raise NotImplementedError
def preload_image(self, image_name: str) -> None:
"""Make a service image available."""
raise NotImplementedError
......
......@@ -93,8 +93,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
th_list = []
for host_conf in self.docker_config: # type: DockerHostConfig
node_stats = NodeStats(host_conf.name)
th = threading.Thread(target=self._update_node_stats, args=(host_conf, node_stats, usage_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th = threading.Thread(target=self._update_node_state, args=(host_conf, node_stats, usage_stats), name='stats_host_{}'.format(host_conf.name), daemon=True)
th.start()
th_list.append((th, node_stats))
......@@ -105,7 +104,20 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
log.debug('Time for platform stats: {:.2f}s'.format(time.time() - time_start))
return platform_stats
def _update_node_stats(self, host_conf: DockerHostConfig, node_stats: NodeStats, get_usage_stats: bool):
def node_state(self, node_name: str, get_usage_stats: bool):
"""Get the state of a single node."""
host_conf = None
for host_conf in self.docker_config:
if host_conf.name == node_name:
break
if host_conf is None:
return None
node_stats = NodeStats(host_conf.name)
self._update_node_state(host_conf, node_stats, get_usage_stats)
return node_stats
def _update_node_state(self, host_conf: DockerHostConfig, node_stats: NodeStats, get_usage_stats: bool):
node_stats.labels = host_conf.labels
try:
my_engine = DockerClient(host_conf)
......
......@@ -125,6 +125,7 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
else:
log.debug('Service {} started'.format(instance.name))
service.set_active(backend_id, ip_address, ports)
_adjust_core_limits(service.backend_host)
return "ok"
......@@ -164,6 +165,7 @@ def terminate_service(service: Service) -> None:
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
_adjust_core_limits(service.backend_host)
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive()
else:
......@@ -174,6 +176,7 @@ def terminate_service(service: Service) -> None:
backend.terminate_service(service)
service.set_inactive()
log.debug('Service {} terminated'.format(service.name))
_adjust_core_limits(service.backend_host)
def terminate_execution(execution: Execution) -> None:
......@@ -221,7 +224,36 @@ def node_list():
return backend.node_list()
def node_state(node_name: str, get_usage_stats: bool) -> NodeStats:
"""Get the state of a single node."""
backend = _get_backend()
state = SQLManager(get_conf())
node = backend.node_state(node_name, get_usage_stats)
node.services = state.service_list(backend_host=node_name, backend_status=Service.BACKEND_START_STATUS)
return node
def list_available_images(node_name):
"""List the images available on the specified node."""
backend = _get_backend()
return backend.list_available_images(node_name)
def _adjust_core_limits(node_name):
node = node_state(node_name, False)
if len(node.services) == 0:
return
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
......@@ -87,7 +87,10 @@ class APIManager:
elif message['command'] == 'scheduler_stats':
try:
data = self.scheduler.stats()
data['platform_stats'] = self.metrics.current_platform_stats.serialize()
if self.metrics.current_platform_stats is None:
data['platform_stats'] = None
else:
data['platform_stats'] = self.metrics.current_platform_stats.serialize()
except ZoeException as e:
log.error(str(e))
self._reply_error(str(e))
......
......@@ -23,13 +23,12 @@ import logging
import threading
import time
from zoe_lib.state import Execution, SQLManager, Service
from zoe_lib.state import Execution, SQLManager
from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential, update_service_resource_limits
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
from zoe_master.stats import NodeStats
log = logging.getLogger(__name__)
......@@ -89,7 +88,6 @@ class ZoeElasticScheduler:
log.error('Error in termination thread: {}'.format(ex))
return
self.trigger()
self._adjust_core_limits()
log.debug('Execution {} terminated successfully'.format(e.id))
try:
......@@ -257,8 +255,6 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
self._adjust_core_limits()
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
......@@ -292,23 +288,3 @@ class ZoeElasticScheduler:
'queue': [s.id for s in queue],
'running_queue': [s.id for s in self.queue_running]
}
def _adjust_core_limits(self):
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
if len(node.services) == 0:
continue
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment