Commit dc955f8e authored by Daniele Venzano's avatar Daniele Venzano

Implement image-aware scheduling and pre-loading

This feature is implemented only on the DockerEnging back-end.
When enabled via the new configuration option --backend-image-management, Zoe will (1) when an execution is submitted, start threads to pull the image on the nodes and (2) schedule services on the hosts that have the image already loaded.
This has two big side-effects:
1. Zoe will not let Docker automatically pull the image at run time and will refuse to start a ZApp with unavailable images
2. ZApps will always run with the latest images: a pull is always done before a run
parent cc5f8d20
......@@ -95,7 +95,8 @@ def load_configuration(test_conf=None):
argparser.add_argument('--scheduler-class', help='Scheduler class to use for scheduling ZApps', choices=['ZoeSimpleScheduler', 'ZoeElasticScheduler'], default='ZoeSimpleScheduler')
argparser.add_argument('--scheduler-policy', help='Scheduler policy to use for scheduling ZApps', choices=['FIFO', 'SIZE'], default='FIFO')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='Swarm')
argparser.add_argument('--backend', choices=['Swarm', 'Kubernetes', 'DockerEngine'], default='Swarm', help='Which backend to enable')
argparser.add_argument('--backend-image-management', action='store_true', help='Enable image management (not implemented for all backends, check the documentation')
# Docker Swarm backend options
argparser.add_argument('--backend-swarm-url', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
......@@ -53,3 +53,7 @@ class BaseBackend:
def platform_state(self) -> ClusterStats:
"""Get the platform state. This method should fill-in a new ClusterStats object at each call, with fresh statistics on the available nodes and resource availability. This information will be used for taking scheduling decisions."""
raise NotImplementedError
def preload_image(self, image_name: str) -> None:
"""Make a service image available."""
raise NotImplementedError
......@@ -276,13 +276,19 @@ class DockerClient:
"""Retrieves container stats based on resource usage."""
cont = self.cli.containers.get(docker_id)
except (docker.errors.NotFound, docker.errors.APIError):
return None
except docker.errors.NotFound:
raise ZoeException('Container not found')
except docker.errors.APIError as e:
raise ZoeException('Docker API error: {}'.format(e))
return cont.stats(stream=stream)
except docker.errors.APIError:
return None
except docker.errors.APIError as e:
raise ZoeException('Docker API error: {}'.format(e))
except requests.exceptions.ReadTimeout:
raise ZoeException('Read timeout')
except ValueError:
raise ZoeException('Docker API decoding error')
def logs(self, docker_id: str, stream: bool, follow=None):
......@@ -302,3 +308,17 @@ class DockerClient:
return cont.logs(stdout=True, stderr=True, follow=follow, stream=stream, timestamps=True, tail='all')
except docker.errors.APIError:
return None
def list_images(self):
"""Retrieve the list of images available on this node."""
return self.cli.images.list()
except (docker.errors.NotFound, docker.errors.APIError):
return []
def pull_image(self, image_name):
"""Pulls an image in the docker engine."""
except docker.errors.APIError as e:
log.error('Cannot download image {}: {}'.format(image_name, e))
......@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zoe backend implementation for old-style stand-alone Docker Swarm."""
"""Zoe backend implementation for one or more Docker Engines."""
import logging
import threading
import time
from zoe_lib.state import Service
from zoe_lib.config import get_conf
......@@ -83,3 +85,21 @@ class DockerEngineBackend(zoe_master.backends.base.BaseBackend):
conf = self._get_config(service.backend_host)
engine = DockerClient(conf)
return engine.logs(service.backend_id, True, False)
def _real_preload(self, image_name, host_conf):
log.debug('Preloading image {} on host {}'.format(image_name,
time_start = time.time()
my_engine = DockerClient(host_conf)
log.debug('Image {} preloaded on host {} in {:.2f}s'.format(image_name,, time.time() - time_start))
def preload_image(self, image_name):
"""Pull an image from a Docker registry into each host. We shuffle the list to prevent the scheduler to find always the first host in the list."""
th_list = []
for backend_host_conf in self.docker_config:
th = threading.Thread(target=self._real_preload, name='dk_image_preload_{}'.format(, args=(image_name, backend_host_conf), daemon=True)
for th in th_list:
......@@ -70,8 +70,11 @@ class DockerStateSynchronizer(threading.Thread):
except ZoeException as e:
node_stats.status = 'offline''Node {} is offline'.format(
if node_stats.status == 'offline':'Node {} is back online'.format(
node_stats.status = 'online'
node_stats.labels = host_config.labels
......@@ -98,11 +101,16 @@ class DockerStateSynchronizer(threading.Thread):
self._update_node_stats(my_engine, node_stats)
self._update_node_stats(my_engine, node_stats)
except ZoeException as e:
node_stats.status = 'offline'
log.warning('Node {} is offline'.format(
def _update_node_stats(self, my_engine, node_stats):
def _update_node_stats(self, my_engine, node_stats: NodeStats):
container_list = my_engine.list()
info =
......@@ -126,6 +134,19 @@ class DockerStateSynchronizer(threading.Thread):
node_stats.cores_in_use = sum([self._get_core_usage(stat) for stat in stats.values()])
if get_conf().backend_image_management:
for dk_image in my_engine.list_images():
image = {
'id': dk_image.attrs['Id'],
'size': dk_image.attrs['Size'],
'names': dk_image.tags
for name in image['names']:
if name[-7:] == ':latest': # add an image with the name without 'latest' to fake Docker image lookup algorithm
def _get_core_usage(self, stat):
this_read_ts = datetime.strptime(stat['read'], '%Y-%m-%dT%H:%M:%S.%f')
......@@ -17,6 +17,7 @@
import logging
from typing import List
import time
from zoe_lib.config import get_conf
from zoe_lib.state import Execution, Service, SQLManager # pylint: disable=unused-import
......@@ -173,3 +174,15 @@ def get_platform_state(state: SQLManager) -> ClusterStats:
for node in platform_state.nodes: = state.service_list(, backend_status=Service.BACKEND_START_STATUS)
return platform_state
def preload_image(image_name):
"""Make a service image available on the cluster, according to the backend support."""
backend = _get_backend()
log.debug('Preloading image {}'.format(image_name))
time_start = time.time()
backend.preload_image(image_name)'Image {} preloaded in {:.2f}s'.format(image_name, time.time() - time_start))
except NotImplementedError:
log.warning('Backend {} does not support image preloading'.format(get_conf().backend))
......@@ -18,10 +18,12 @@
import logging
import os
import shutil
import threading
from zoe_lib.state import Execution, SQLManager
from zoe_lib.config import get_conf
from zoe_master.scheduler import ZoeBaseScheduler
from zoe_master.backends.interface import preload_image
log = logging.getLogger(__name__)
......@@ -56,6 +58,9 @@ def _digest_application_description(state: SQLManager, execution: Execution):
counter += 1
assert counter == total_count
if get_conf().backend_image_management:
threading.Thread(target=preload_image, args=(service_descr['image'],), name='image-downloader-{}'.format(service_descr['name']), daemon=True).start()
def execution_submit(state: SQLManager, scheduler: ZoeBaseScheduler, execution: Execution):
"""Submit a new execution to the scheduler."""
......@@ -3,6 +3,7 @@
import logging
from zoe_lib.state.sql_manager import Execution, Service
from zoe_lib.config import get_conf
from zoe_master.stats import ClusterStats, NodeStats
log = logging.getLogger(__name__)
......@@ -23,10 +24,15 @@ class SimulatedNode: = [] =
self.labels = real_node.labels
self.images = real_node.image_list
def service_fits(self, service: Service) -> bool:
"""Checks whether a service can fit in this node"""
return set(service.labels).issubset(self.labels) and service.resource_reservation.memory.min < self.node_free_memory() and service.resource_reservation.cores.min <= self.node_free_cores()
ret = set(service.labels).issubset(self.labels)
ret = ret and service.resource_reservation.memory.min < self.node_free_memory()
ret = ret and service.resource_reservation.cores.min <= self.node_free_cores()
ret = ret and self._image_is_available(service.image_name)
return ret
def service_why_unfit(self, service) -> str:
"""Generate an explanation of why the service does not fit this node."""
......@@ -36,6 +42,16 @@ class SimulatedNode:
return 'needs {} more cores'.format(self.node_free_cores() - service.resource_reservation.cores.min)
elif not set(service.labels).issubset(self.labels):
return 'service required labels {} to be defined on the node'.format(service.labels)
elif not self._image_is_available(service.image_name):
return 'image {} is not available on this node'.format(service.image_name)
def _image_is_available(self, image_name) -> bool:
if not get_conf().backend_image_management:
return True
for image in self.images:
if image_name in image['names']:
return True
return False
def service_add(self, service):
"""Add a service in this node."""
......@@ -43,6 +43,7 @@ class NodeStats(Stats):
self.status = None
self.error = '' = []
self.image_list = []
def serialize(self):
"""Convert the object into a dict."""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment