Commit f51f24f6 authored by Daniele Venzano's avatar Daniele Venzano 🏇

Merge branch 'devel/fixes' into 'master'

Fix several bugs

See merge request !66
parents 998b9919 d7a92187
......@@ -83,11 +83,9 @@ Authentication:
Scheduler options:
* ``scheduler-class = <ZoeSimpleScheduler | ZoeElasticScheduler>`` : Scheduler class to use for scheduling ZApps (default: elastic scheduler)
* ``scheduler-class = <ZoeElasticScheduler>`` : Scheduler class to use for scheduling ZApps (default: elastic scheduler)
* ``scheduler-policy = <FIFO | SIZE>`` : Scheduler policy to use for scheduling ZApps (default: FIFO)
Default options for the scheduler enable the traditional Zoe scheduler that was already available in the previous releases.
ZApp shop:
* ``zapp-shop-path = /var/lib/zoe-apps`` : Path where ZApp folders are stored
......
sonar.projectKey=zoe
sonar.projectName=zoe
sonar.projectVersion=1.0
sonar.host.url=http://your-sonarqube-server-address
sonar.sources=.
sonar.modules=zoe_api,zoe_master,zoe_lib
sonar.exclusions=zoe_api/web/static/**
sonar.sourceEncoding=UTF-8
......@@ -243,6 +243,10 @@ div.running {
border-color: green;
}
td.running {
border: 1px solid green;
}
div.node_detail {
width: 48%;
float: left;
......
......@@ -86,6 +86,7 @@
<th>Zoe status</th>
<th>Backend status</th>
<th>Host</th>
<th>Labels</th>
<th>Output logs</th>
<th>Errors</th>
</tr>
......@@ -98,6 +99,7 @@
<td>{{ s['status'] }}</td>
<td>{{ s['backend_status'] }}</td>
<td>{{ s['backend_host'] }}</td>
<td>{% for l in s['labels'] %}{{ l }} {% endfor %}</td>
{% if s['status'] != 'created' %}
<td><a href="{{ reverse_url("service_logs", s['id']) }}">open</a></td>
{% else %}
......
......@@ -209,6 +209,7 @@
</div>
<h3 class="section"><a name="service-distrib">Service distribution</a></h3>
<p>Services marked with a green border are essential, elastic ones have no border.</p>
<table class="service-distrib">
<thead>
<tr>
......@@ -222,7 +223,7 @@
<td class="cell-host">{{ node.name }}</td>
{% for service in services_per_node[node.name] %}
{% if service.backend_status == "started" %}
<td><a href="{{ reverse_url('execution_inspect', service['execution_id']) }}">{{ service['name'] }}</a> (M: <script>format_bytes({{ node.service_stats[service['id']]['mem_limit'] }});</script> C: {{ '%0.2f'|format(node.service_stats[service['id']]['core_limit']|float) }})</td>
<td class="{{ 'running' if service.essential }}"><a href="{{ reverse_url('execution_inspect', service['execution_id']) }}">{{ service['name'] }}</a> (M: <script>format_bytes({{ node.service_stats[service['id']]['mem_limit'] }});</script> C: {{ '%0.2f'|format(node.service_stats[service['id']]['core_limit']|float) }})</td>
{% endif %}
{% endfor %}
</tr>
......
......@@ -25,6 +25,7 @@
{% endif %}</li>
<li>Suggested CPU core allocation limit: {{ service["resources"]["cores"]["min"] if service["resources"]["cores"]["min"] != None else "No limit" }}</li>
</ul></li>
<li>Image: <code>{{ service["image"] }}</code></li>
{% endfor %}
</ul>
......
......@@ -98,7 +98,7 @@ def load_configuration(test_conf=None):
argparser.add_argument('--proxy-path', help='Proxy base path', default='127.0.0.1')
# Scheduler
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeElasticScheduler'], default='ZoeElasticScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='DockerEngine', help='Which backend to enable')
......
......@@ -155,7 +155,7 @@ class DockerClient:
except KeyError:
info['host'] = 'N/A'
if container.status == 'running' or container.status == 'restarting':
if container.status == 'running' or container.status == 'restarting' or container.status == 'removing':
info["state"] = Service.BACKEND_START_STATUS
info["running"] = True
elif container.status == 'paused' or container.status == 'exited' or container.status == 'dead':
......
......@@ -213,7 +213,7 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
'names': dk_image.tags # type: list
}
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
......@@ -225,7 +225,11 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
def update_service(self, service, cores=None, memory=None):
"""Update a service reservation."""
conf = self._get_config(service.backend_host)
engine = DockerClient(conf)
try:
engine = DockerClient(conf)
except ZoeException as e:
log.error(str(e))
return
if service.backend_id is not None:
info = engine.info()
if cores is not None and cores > info['NCPU']:
......
......@@ -75,7 +75,7 @@ class DockerStateSynchronizer(threading.Thread):
self.host_stats[host_config.name].memory_total = info['MemTotal']
self.host_stats[host_config.name].labels = host_config.labels
if info['Labels'] is not None:
self.host_stats[host_config.name].labels += set(info['Labels'])
self.host_stats[host_config.name].labels.union(set(info['Labels']))
self.host_stats[host_config.name].memory_allocated = sum([cont['memory_soft_limit'] for cont in container_list if cont['memory_soft_limit'] != info['MemTotal']])
self.host_stats[host_config.name].cores_allocated = sum([cont['cpu_quota'] / cont['cpu_period'] for cont in container_list if cont['cpu_period'] != 0])
......@@ -102,7 +102,7 @@ class DockerStateSynchronizer(threading.Thread):
sleep_time = CHECK_INTERVAL - (time.time() - time_start)
if sleep_time <= 0:
log.warning('synchro thread for host {} is late of {:.2f} seconds'.format(host_config.name, sleep_time * -1))
log.warning('synchro thread for host {} is late by {:.2f} seconds'.format(host_config.name, sleep_time * -1))
sleep_time = 0
if self.stop.wait(timeout=sleep_time):
break
......
......@@ -16,5 +16,4 @@
"""The Zoe schedulers"""
from .base_scheduler import ZoeBaseScheduler
from .simple_scheduler import ZoeSimpleScheduler
from .elastic_scheduler import ZoeElasticScheduler
......@@ -18,7 +18,6 @@ The Elastic scheduler is the implementation of the scheduling algorithm presente
https://arxiv.org/abs/1611.09528
"""
from collections import namedtuple
import logging
import threading
import time
......@@ -26,7 +25,7 @@ import time
from zoe_lib.state import Execution, SQLManager, Service # pylint: disable=unused-import
from zoe_master.exceptions import ZoeException
from zoe_master.backends.interface import terminate_execution, start_elastic, start_essential, update_service_resource_limits
from zoe_master.backends.interface import terminate_execution, terminate_service, start_elastic, start_essential, update_service_resource_limits
from zoe_master.scheduler.simulated_platform import SimulatedPlatform
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
from zoe_master.stats import NodeStats # pylint: disable=unused-import
......@@ -34,7 +33,6 @@ from zoe_master.metrics.base import StatsManager # pylint: disable=unused-impor
log = logging.getLogger(__name__)
ExecutionProgress = namedtuple('ExecutionProgress', ['last_time_scheduled', 'progress_sequence'])
SELF_TRIGGER_TIMEOUT = 60 # the scheduler will trigger itself periodically in case platform resources have changed outside its control
......@@ -53,6 +51,13 @@ def catch_exceptions_and_retry(func):
return wrapper
class ExecutionProgress:
"""Additional data for tracking execution sizes while in the queue."""
def __init__(self):
self.last_time_scheduled = 0
self.progress_sequence = []
class ZoeElasticScheduler:
"""The Scheduler class for size-based scheduling. Policy can be "FIFO" or "SIZE"."""
def __init__(self, state: SQLManager, policy, metrics: StatsManager):
......@@ -75,7 +80,7 @@ class ZoeElasticScheduler:
self.queue_running.append(execution)
else:
self.queue.append(execution)
self.additional_exec_state[execution.id] = ExecutionProgress(0, [])
self.additional_exec_state[execution.id] = ExecutionProgress()
self.loop_th.start()
self.core_limit_th.start()
......@@ -89,7 +94,7 @@ class ZoeElasticScheduler:
:param execution: The execution
:return:
"""
exec_data = ExecutionProgress(0, [])
exec_data = ExecutionProgress()
self.additional_exec_state[execution.id] = exec_data
self.queue.append(execution)
self.trigger()
......@@ -154,10 +159,9 @@ class ZoeElasticScheduler:
remaining_execution_time = (1 - progress) * execution.size
execution.size = remaining_execution_time * execution.services_count
def _pop_all_with_same_size(self):
def _pop_all(self):
out_list = []
while len(self.queue) > 0:
execution = self.queue.pop(0) # type: Execution
for execution in self.queue: # type: Execution
ret = execution.termination_lock.acquire(blocking=False)
if ret and execution.status != Execution.TERMINATED_STATUS:
out_list.append(execution)
......@@ -166,6 +170,12 @@ class ZoeElasticScheduler:
return out_list
def _requeue(self, execution: Execution):
execution.termination_lock.release()
if execution not in self.queue: # make sure the execution is in the queue
log.warning("Execution {} re-queued, but it was not in the queue".format(execution.id))
self.queue.append(execution)
@catch_exceptions_and_retry
def loop_start_th(self): # pylint: disable=too-many-locals
"""The Scheduler thread loop."""
......@@ -195,23 +205,17 @@ class ZoeElasticScheduler:
if self.policy == "SIZE":
self.queue.sort(key=lambda execution: execution.size)
log.debug('--> Queue dump after sorting')
for j in self.queue:
log.debug(str(j))
log.debug('--> End dump')
jobs_to_attempt_scheduling = self._pop_all_with_same_size()
jobs_to_attempt_scheduling = self._pop_all()
log.debug('Scheduler inner loop, jobs to attempt scheduling:')
for job in jobs_to_attempt_scheduling:
log.debug("-> {}".format(job))
log.debug("-> {} ({})".format(job, job.size))
try:
platform_state = self.metrics.current_stats
except ZoeException:
log.error('Cannot retrieve platform state, cannot schedule')
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
self.queue = jobs_to_attempt_scheduling + self.queue
self._requeue(job)
break
cluster_status_snapshot = SimulatedPlatform(platform_state)
......@@ -256,7 +260,7 @@ class ZoeElasticScheduler:
jobs_to_attempt_scheduling.remove(job)
continue # trow away the execution
elif ret == "requeue":
self.queue.insert(0, job)
self._requeue(job)
continue
elif ret == "ok":
job.set_running()
......@@ -269,15 +273,14 @@ class ZoeElasticScheduler:
log.debug('execution {}: all services are active'.format(job.id))
job.termination_lock.release()
jobs_to_attempt_scheduling.remove(job)
self.queue.remove(job)
self.queue_running.append(job)
self.additional_exec_state[job.id].last_time_scheduled = time.time()
self.core_limit_recalc_trigger.set()
for job in jobs_to_attempt_scheduling:
job.termination_lock.release()
# self.queue.insert(0, job)
self.queue = jobs_to_attempt_scheduling + self.queue
self._requeue(job)
if len(self.queue) == 0:
log.debug('empty queue, exiting inner loop')
......@@ -353,6 +356,7 @@ class ZoeElasticScheduler:
for service in execution.services:
if not service.essential and service.backend_status == service.BACKEND_DIE_STATUS:
log.info("Elastic service {} ({}) of execution {} died, rescheduling".format(service.id, service.name, execution.id))
terminate_service(service)
service.restarted()
self.queue_running.remove(execution)
self.queue.append(execution)
......
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Scheduler."""
import logging
import threading
from zoe_lib.state import Execution
from zoe_master.backends.interface import start_all, terminate_execution
from zoe_master.scheduler.base_scheduler import ZoeBaseScheduler
from zoe_master.exceptions import UnsupportedSchedulerPolicyError
log = logging.getLogger(__name__)
class ZoeSimpleScheduler(ZoeBaseScheduler):
"""The Scheduler class."""
def __init__(self, state, policy, metrics):
super().__init__(state)
self.metrics = metrics
if policy != 'FIFO':
raise UnsupportedSchedulerPolicyError
self.fifo_queue = []
self.trigger_semaphore = threading.Semaphore(0)
self.async_threads = []
self.loop_quit = False
self.loop_th = threading.Thread(target=self.loop_start_th, name='scheduler')
self.loop_th.start()
log.warning('This scheduler class is deprecated and will be removed in version 2018.03, please switch to the elastic scheduler.')
def trigger(self):
"""Trigger a scheduler run."""
self.trigger_semaphore.release()
def incoming(self, execution: Execution):
"""
This method adds the execution to the end of the FIFO queue and triggers the scheduler.
:param execution: The execution
:return:
"""
self.fifo_queue.append(execution)
self.trigger()
def terminate(self, execution: Execution) -> None:
"""
Inform the master that an execution has been terminated. This can be done asynchronously.
:param execution: the terminated execution
:return: None
"""
def async_termination():
"""Actual termination run in a thread."""
terminate_execution(execution)
self.trigger()
try:
self.fifo_queue.remove(execution)
except ValueError:
pass
th = threading.Thread(target=async_termination, name='termination_{}'.format(execution.id))
th.start()
self.async_threads.append(th)
def loop_start_th(self):
"""The Scheduler thread loop."""
auto_trigger_base = 60 # seconds
auto_trigger = auto_trigger_base
while True:
ret = self.trigger_semaphore.acquire(timeout=1)
if not ret: # Semaphore timeout, do some thread cleanup
counter = len(self.async_threads)
while counter > 0:
if len(self.async_threads) == 0:
break
th = self.async_threads.pop(0)
th.join(0.1)
if th.isAlive(): # join failed
log.debug('Thread {} join failed'.format(th.name))
self.async_threads.append(th)
counter -= 1
auto_trigger -= 1
if auto_trigger == 0:
auto_trigger = auto_trigger_base
self.trigger()
continue
if self.loop_quit:
break
log.debug("Scheduler start loop has been triggered")
if len(self.fifo_queue) == 0:
continue
e = self.fifo_queue[0]
assert isinstance(e, Execution)
e.set_starting()
self.fifo_queue.pop(0) # remove the execution form the queue
ret = start_all(e)
if ret == 'requeue':
self.fifo_queue.append(e)
elif ret == 'fatal':
continue # throw away the execution
else:
e.set_running()
def quit(self):
"""Stop the scheduler thread."""
self.loop_quit = True
self.trigger()
self.loop_th.join()
def stats(self):
"""Scheduler statistics."""
return {
'queue_length': len(self.fifo_queue),
'running_length': 0,
'termination_threads_count': len(self.async_threads),
'queue': [s.id for s in self.fifo_queue],
'running_queue': [],
'platform_stats': {}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment