Commit 3c7707c8 authored by Daniele Venzano's avatar Daniele Venzano

Do the core reallocation much less frequently for performance reasons

parent 6ca4ed86
...@@ -125,7 +125,6 @@ def service_list_to_containers(execution: Execution, service_list: List[Service] ...@@ -125,7 +125,6 @@ def service_list_to_containers(execution: Execution, service_list: List[Service]
else: else:
log.debug('Service {} started'.format(instance.name)) log.debug('Service {} started'.format(instance.name))
service.set_active(backend_id, ip_address, ports) service.set_active(backend_id, ip_address, ports)
_adjust_core_limits(service.backend_host)
return "ok" return "ok"
...@@ -165,7 +164,6 @@ def terminate_service(service: Service) -> None: ...@@ -165,7 +164,6 @@ def terminate_service(service: Service) -> None:
backend.terminate_service(service) backend.terminate_service(service)
service.set_inactive() service.set_inactive()
log.debug('Service {} terminated'.format(service.name)) log.debug('Service {} terminated'.format(service.name))
_adjust_core_limits(service.backend_host)
elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS: elif service.status == Service.CREATED_STATUS or service.status == Service.RUNNABLE_STATUS:
service.set_inactive() service.set_inactive()
else: else:
...@@ -176,7 +174,6 @@ def terminate_service(service: Service) -> None: ...@@ -176,7 +174,6 @@ def terminate_service(service: Service) -> None:
backend.terminate_service(service) backend.terminate_service(service)
service.set_inactive() service.set_inactive()
log.debug('Service {} terminated'.format(service.name)) log.debug('Service {} terminated'.format(service.name))
_adjust_core_limits(service.backend_host)
def terminate_execution(execution: Execution) -> None: def terminate_execution(execution: Execution) -> None:
...@@ -237,23 +234,3 @@ def list_available_images(node_name): ...@@ -237,23 +234,3 @@ def list_available_images(node_name):
"""List the images available on the specified node.""" """List the images available on the specified node."""
backend = _get_backend() backend = _get_backend()
return backend.list_available_images(node_name) return backend.list_available_images(node_name)
def _adjust_core_limits(node_name):
node = node_state(node_name, False)
if len(node.services) == 0:
return
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
...@@ -23,12 +23,13 @@ import logging ...@@ -23,12 +23,13 @@ import logging
import threading import threading
import time import time
from zoe_lib.state import Execution, SQLManager from zoe_lib.state import Execution, SQLManager, Service
from zoe_master.exceptions import ZoeException from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential from zoe_master.backends.interface import terminate_execution, get_platform_state, start_elastic, start_essential, update_service_resource_limits
from zoe_master.scheduler.simulated_platform import SimulatedPlatform from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError from zoe_master.exceptions import UnsupportedSchedulerPolicyError
from zoe_master.stats import NodeStats
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -88,6 +89,7 @@ class ZoeElasticScheduler: ...@@ -88,6 +89,7 @@ class ZoeElasticScheduler:
log.error('Error in termination thread: {}'.format(ex)) log.error('Error in termination thread: {}'.format(ex))
return return
self.trigger() self.trigger()
self._adjust_core_limits()
log.debug('Execution {} terminated successfully'.format(e.id)) log.debug('Execution {} terminated successfully'.format(e.id))
try: try:
...@@ -255,6 +257,8 @@ class ZoeElasticScheduler: ...@@ -255,6 +257,8 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job) jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job) self.queue_running.append(job)
self._adjust_core_limits()
for job in jobs_to_attempt_scheduling: for job in jobs_to_attempt_scheduling:
job.termination_lock.release() job.termination_lock.release()
# self.queue.insert(0, job) # self.queue.insert(0, job)
...@@ -288,3 +292,23 @@ class ZoeElasticScheduler: ...@@ -288,3 +292,23 @@ class ZoeElasticScheduler:
'queue': [s.id for s in queue], 'queue': [s.id for s in queue],
'running_queue': [s.id for s in self.queue_running] 'running_queue': [s.id for s in self.queue_running]
} }
def _adjust_core_limits(self):
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
if len(node.services) == 0:
continue
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment