Commit 7dc288c7 authored by Daniele Venzano's avatar Daniele Venzano

Reduce the number of concurrent threads trying to update core limits

parent d5e71b67
......@@ -255,4 +255,4 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
cpu_quota = int(cores * 100000)
engine.update(service.backend_id, cpu_quota=cpu_quota, mem_reservation=memory)
else:
log.error('Cannot update service {}, since it has no backend ID'.format(service.name))
log.error('Cannot update service {} ({}), since it has no backend ID'.format(service.name, service.id))
......@@ -28,7 +28,6 @@ from zoe_master.exceptions import ZoeException
log = logging.getLogger(__name__)
CHECK_INTERVAL = 10
THREAD_POOL_SIZE = 10
class DockerStateSynchronizer(threading.Thread):
......@@ -37,8 +36,8 @@ class DockerStateSynchronizer(threading.Thread):
def __init__(self, state: SQLManager) -> None:
super().__init__()
self.setName('checker')
self.stop = False
self.my_stop = False
self.stop = threading.Event()
self.my_stop = threading.Event()
self.state = state
self.setDaemon(True)
self.host_checkers = []
......@@ -50,10 +49,13 @@ class DockerStateSynchronizer(threading.Thread):
self.start()
def _host_subthread(self, host_config: DockerHostConfig):
log.info("Checker thread started")
log.info("Synchro thread for host {} started".format(host_config.name))
node_status = 'offline'
while not self.stop:
while True:
ret = self.stop.wait(timeout=CHECK_INTERVAL)
if ret:
break
try:
my_engine = DockerClient(host_config)
except ZoeException as e:
......@@ -77,8 +79,7 @@ class DockerStateSynchronizer(threading.Thread):
log.warning('Container {} on host {} has no corresponding service'.format(cont['name'], host_config.name))
continue
self._update_service_status(service, cont)
time.sleep(CHECK_INTERVAL)
log.info("Synchro thread for host {} stopped".format(host_config.name))
def _update_service_status(self, service: Service, container):
"""Update the service status."""
......@@ -90,7 +91,10 @@ class DockerStateSynchronizer(threading.Thread):
def run(self):
"""The thread loop."""
log.info("Checker thread started")
while not self.my_stop:
while True:
ret = self.my_stop.wait(timeout=CHECK_INTERVAL)
if ret:
break
to_remove = []
to_add = []
for th, conf in self.host_checkers:
......@@ -104,11 +108,12 @@ class DockerStateSynchronizer(threading.Thread):
self.host_checkers.remove(dead_th)
for new_th in to_add:
self.host_checkers.append(new_th)
time.sleep(CHECK_INTERVAL)
log.info("Checker thread stopped")
def quit(self):
"""Stops the thread."""
self.stop = True
self.stop.set()
for th, conf_ in self.host_checkers:
th.join()
self.my_stop = True
self.my_stop.set()
self.join()
......@@ -208,10 +208,6 @@ def preload_image(image_name):
def update_service_resource_limits(service, cores=None, memory=None):
"""Update a service reservation."""
backend = _get_backend()
if cores is not None:
log.debug('Setting core limit to {} for service {}'.format(cores, service.id))
if memory is not None:
log.debug('Setting memory limit to {} for service {}'.format(memory, service.id))
backend.update_service(service, cores, memory)
......
......@@ -110,3 +110,4 @@ class APIManager:
"""Cleanly close the ZMQ resources."""
self.zmq_s.close()
self.context.term()
log.info('ZeroMQ API server stopped')
......@@ -39,21 +39,21 @@ class StatsManager(threading.Thread):
super().__init__(name='metrics', daemon=True)
self.state = state
self.deployment_name = get_conf().deployment_name
self.stop = False
self.stop = threading.Event()
self.current_platform_stats = None
def quit(self):
"""Terminates the sender thread."""
self.stop = True
self.stop.set()
self.join()
def run(self):
"""The thread loop."""
while not self.stop:
while True:
time_start = time.time()
self.current_platform_stats = get_platform_state(self.state, with_usage_stats=True)
sleep_time = METRIC_INTERVAL - (time.time() - time_start)
if sleep_time > 0 and not self.stop:
time.sleep(sleep_time)
if sleep_time > 0 and self.stop.wait(timeout=sleep_time):
break
......@@ -50,7 +50,8 @@ class ZoeElasticScheduler:
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self._thread_wrapper, name='scheduler')
self.loop_th.start()
self.core_limit_recalc_trigger = threading.Event()
self.core_limit_th = threading.Thread(target=self._adjust_core_limits, name='adjust_core_limits')
self.state = state
for execution in self.state.execution_list(status='running'):
if execution.all_services_running:
......@@ -58,6 +59,8 @@ class ZoeElasticScheduler:
else:
self.queue.append(execution)
self.additional_exec_state[execution.id] = ExecutionProgress(0, [])
self.loop_th.start()
self.core_limit_th.start()
def trigger(self):
"""Trigger a scheduler run."""
......@@ -89,7 +92,6 @@ class ZoeElasticScheduler:
log.error('Error in termination thread: {}'.format(ex))
return
self.trigger()
self._adjust_core_limits()
log.debug('Execution {} terminated successfully'.format(e.id))
try:
......@@ -98,12 +100,14 @@ class ZoeElasticScheduler:
try:
self.queue_running.remove(execution)
except ValueError:
log.error('Terminating execution {} that is not in any queue'.format(execution.id))
log.error('Cannot terminate execution {}, it is not in any queue'.format(execution.id))
return
try:
del self.additional_exec_state[execution.id]
except KeyError:
pass
self.core_limit_recalc_trigger.set()
th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id), args=(execution,))
th.start()
......@@ -172,6 +176,7 @@ class ZoeElasticScheduler:
if len(self.queue) == 0:
log.debug("Scheduler loop has been triggered, but the queue is empty")
self.core_limit_recalc_trigger.set()
continue
log.debug("Scheduler loop has been triggered")
......@@ -257,7 +262,7 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
self.queue_running.append(job)
self._adjust_core_limits()
self.core_limit_recalc_trigger.set()
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
......@@ -276,7 +281,9 @@ class ZoeElasticScheduler:
"""Stop the scheduler thread."""
self.loop_quit = True
self.trigger()
self.core_limit_recalc_trigger.set()
self.loop_th.join()
self.core_limit_th.join()
def stats(self):
"""Scheduler statistics."""
......@@ -294,21 +301,30 @@ class ZoeElasticScheduler:
}
def _adjust_core_limits(self):
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
if len(node.services) == 0:
continue
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
while not self.loop_quit:
self.core_limit_recalc_trigger.wait()
if self.loop_quit:
break
log.debug('Updating core limits')
time_start = time.time()
stats = get_platform_state(self.state)
for node in stats.nodes: # type: NodeStats
if len(node.services) == 0:
continue
new_core_allocations = {}
core_sum = 0
for service in node.services: # type: Service
new_core_allocations[service.id] = service.resource_reservation.cores.min
core_sum += service.resource_reservation.cores.min
if core_sum < node.cores_total:
cores_free = node.cores_total - core_sum
cores_to_add = cores_free / len(node.services)
else:
cores_to_add = 0
for service in node.services: # type: Service
update_service_resource_limits(service, cores=new_core_allocations[service.id] + cores_to_add)
log.debug('Update core limits took {:.2f}s'.format(time.time() - time_start))
self.core_limit_recalc_trigger.clear()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment