Commit 87185e02 authored by Daniele Venzano's avatar Daniele Venzano

Use consistent naming for all objects created in docker (containers and networks)

parent 4491c995
......@@ -20,7 +20,7 @@ def hadoop_namenode_service(image):
:rtype: dict
"""
service = {
'name': "hdfs-namenode",
'name': "namenode",
'docker_image': image,
'monitor': True,
'required_resources': {"memory": 2 * 1024 * 1024 * 1024}, # 2 GB
......@@ -34,7 +34,7 @@ def hadoop_namenode_service(image):
}
],
'environment': [
["NAMENODE_HOST", "hdfs-namenode-{execution_id}"]
["NAMENODE_HOST", "namenode-{execution_name}-{user_name}-{deployment_name}-zoe"]
]
}
return service
......@@ -49,13 +49,13 @@ def hadoop_datanode_service(count, image):
ret = []
for i in range(count):
service = {
'name': "hdfs-datanode-{}".format(i),
'name': "datanode{}".format(i),
'docker_image': image,
'monitor': False,
'required_resources': {"memory": 1 * 1024 * 1024 * 1024}, # 1 GB
'ports': [],
'environment': [
["NAMENODE_HOST", "hdfs-namenode-{execution_id}"]
["NAMENODE_HOST", "namenode-{execution_name}-{user_name}-{deployment_name}-zoe"]
]
}
ret.append(service)
......
......@@ -47,12 +47,12 @@ def spark_jupyter_notebook_service(mem_limit, worker_mem_limit, image):
}
],
'environment': [
["SPARK_MASTER", "spark://{name_prefix}-spark-master-{execution_id}.{name_prefix}-usernet-{user_id}:7077"],
["SPARK_MASTER", "spark://spark-master-{execution_name}-{user_name}-{deployment_name}-zoe.{user_name}-{deployment_name}-zoe:7077"],
["SPARK_EXECUTOR_RAM", str(executor_ram)],
["SPARK_DRIVER_RAM", str(driver_ram)],
["NB_USER", "{user_name}"],
["NAMENODE_HOST", "hdfs-namenode.hdfs"]
["NB_USER", "{user_name}"]
],
'networks': []
}
return service
......
......@@ -22,7 +22,7 @@ def openmpi_worker_service(counter, workspace_volume, worker_memory):
:rtype: dict
"""
service = {
'name': "openmpi-worker-{}".format(counter),
'name': "mpiworker{}".format(counter),
'docker_image': '192.168.45.252:5000/zoerepo/openmpi-worker',
'monitor': False,
'required_resources': {"memory": worker_memory},
......@@ -43,7 +43,7 @@ def openmpi_mpirun_service(workspace_volume, mpirun_commandline, worker_memory):
:rtype: dict
"""
service = {
'name': "openmpi-mpirun",
'name': "mpirun",
'docker_image': '192.168.45.252:5000/zoerepo/openmpi-worker',
'monitor': True,
'required_resources': {"memory": worker_memory},
......
......@@ -35,7 +35,7 @@ def spark_master_service(mem_limit, image):
}
],
'environment': [
["SPARK_MASTER_IP", "{name_prefix}-spark-master-{execution_id}.{name_prefix}-usernet-{user_id}"],
["SPARK_MASTER_IP", "spark-master-{execution_name}-{user_name}-{deployment_name}-zoe.{user_name}-{deployment_name}-zoe"],
],
'networks': []
}
......@@ -60,7 +60,7 @@ def spark_worker_service(count, mem_limit, cores, image):
ret = []
for i in range(count):
service = {
'name': "spark-worker-{}".format(i),
'name': "spark-worker{}".format(i),
'docker_image': image,
'monitor': False,
'required_resources': {"memory": mem_limit},
......@@ -76,8 +76,8 @@ def spark_worker_service(count, mem_limit, cores, image):
'environment': [
["SPARK_WORKER_CORES", str(cores)],
["SPARK_WORKER_RAM", str(worker_ram)],
["SPARK_MASTER_IP", "{name_prefix}-spark-master-{execution_id}.{name_prefix}-usernet-{user_id}"],
["SPARK_LOCAL_IP", "{name_prefix}-spark-worker-" + str(i) + "-{execution_id}.{name_prefix}-usernet-{user_id}"]
["SPARK_MASTER_IP", "spark-master-{execution_name}-{user_name}-{deployment_name}-zoe.{user_name}-{deployment_name}-zoe"],
["SPARK_LOCAL_IP", "spark-worker" + str(i) + "-{execution_name}-{user_name}-{deployment_name}-zoe.{user_name}-{deployment_name}-zoe"]
],
'networks': []
}
......@@ -110,7 +110,7 @@ def spark_submit_service(mem_limit, worker_mem_limit, image, command, spark_opti
}
],
'environment': [
["SPARK_MASTER_IP", "{name_prefix}-spark-master-{execution_id}.{name_prefix}-usernet-{user_id}"],
["SPARK_MASTER_IP", "spark-master-{execution_name}-{user_name}-{deployment_name}-zoe.{user_name}-{deployment_name}-zoe"],
["SPARK_OPTIONS", spark_options],
["SPARK_EXECUTOR_RAM", str(executor_ram)],
["APPLICATION_URL", "{application_binary}"]
......
......@@ -43,13 +43,14 @@ class ZoeServiceAPI(ZoeAPIBase):
else:
raise ZoeAPIException('error retrieving service {}'.format(container_id))
def died(self, container_id: int):
def died(self, service_name, execution_name):
"""
Inform the master that a service died. Used by the observer process.
:param container_id: Zoe ID of the service that died
:param service_name: name of the service that died
:param execution_name: name of the execution the service that died belongs to
:return:
"""
data, status_code = self._rest_delete('/service/{}'.format(container_id))
data, status_code = self._rest_delete('/service/{}/{}'.format(execution_name, service_name))
if status_code != 204:
raise ZoeAPIException(data['message'])
......@@ -250,23 +250,26 @@ class SwarmClient:
log.exception('cannot disconnect service {} from network {}'.format(container_id, network_id))
def list(self, only_label=None) -> list:
if only_label is None:
filters = {}
else:
filters = {
'label': only_label
}
ret = self.cli.containers(all=True, filters=filters)
ret = self.cli.containers(all=True)
conts = []
for c in ret:
aux = c['Names'][0].split('/') # Swarm returns container names in the form /host/name
conts.append({
'id': c['Id'],
'host': aux[1],
'name': aux[2],
'labels': c['Labels'],
'status': c['Status']
})
match = True
for k, v in only_label.items():
if k not in c['Labels']:
match = False
break
if c['Labels'][k] != v:
match = False
break
if match:
aux = c['Names'][0].split('/') # Swarm returns container names in the form /host/name
conts.append({
'id': c['Id'],
'host': aux[1],
'name': aux[2],
'labels': c['Labels'],
'status': c['Status']
})
return conts
......
......@@ -22,7 +22,6 @@ from zoe_master.config import get_conf, singletons
from zoe_master.scheduler import ZoeScheduler
from zoe_master.state import execution as execution_module, application as application_module, service as service_module
from zoe_master.state.manager import StateManager
from zoe_master.stats import ContainerStats
from zoe_master.stats import SwarmStats, SchedulerStats
log = logging.getLogger(__name__)
......@@ -60,31 +59,29 @@ class PlatformManager:
def _spawn_service(self, execution: execution_module.Execution, service_description: application_module.ServiceDescription) -> bool:
copts = DockerContainerOptions()
copts.gelf_log_address = get_conf().gelf_address
copts.name = get_conf().deployment_name + '-' + service_description.name + "-{}".format(execution.owner.name)
copts.name = service_description.name + "-" + execution.name + "-" + execution.owner.name + "-" + get_conf().deployment_name + "-zoe"
copts.set_memory_limit(service_description.required_resources['memory'])
copts.network_name = '{}-usernet-{}'.format(get_conf().deployment_name, execution.owner.id)
copts.network_name = '{}-{}-zoe'.format(execution.owner.name, get_conf().deployment_name)
container_id = self.state_manager.gen_id()
copts.labels = {
'zoe.{}'.format(get_conf().deployment_name): '',
'zoe.execution.name': execution.name,
'zoe.service.name': service_description.name,
'zoe.owner': execution.owner.name,
'zoe.prefix': get_conf().deployment_name,
'zoe.deployment_name': get_conf().deployment_name,
'zoe.type': 'app_service'
}
if service_description.monitor:
copts.labels['zoe.monitor'] = ''
copts.labels['zoe.monitor'] = 'true'
else:
copts.labels['zoe.normal'] = ''
copts.labels['zoe.monitor'] = 'false'
copts.restart = not service_description.monitor # Monitor containers should not restart
# Generate a dictionary containing the current cluster status (before the new container is spawned)
# This information is used to substitute template strings in the environment variables
subst_dict = {
"execution_id": str(execution.id),
"user_id": str(execution.owner.id),
"execution_name": execution.name,
'user_name': execution.owner.name,
'name_prefix': get_conf().deployment_name
'deployment_name': get_conf().deployment_name
}
for env_name, env_value in service_description.environment:
try:
......@@ -143,13 +140,12 @@ class PlatformManager:
def start_gateway_container(self, user):
copts = DockerContainerOptions()
copts.name = '{}-gateway-{}'.format(get_conf().deployment_name, user.id)
copts.network_name = '{}-usernet-{}'.format(get_conf().deployment_name, user.id)
copts.name = 'gateway-{}-{}-zoe'.format(user.name, get_conf().deployment_name)
copts.network_name = '{}-{}-zoe'.format(user.name, get_conf().deployment_name)
copts.ports.append(1080)
copts.labels = {
'zoe.{}'.format(get_conf().deployment_name): '',
'zoe.owner': user.name,
'zoe.prefix': get_conf().deployment_name,
'zoe.deployment': get_conf().deployment_name,
'zoe.type': 'gateway'
}
copts.restart = True
......@@ -162,7 +158,6 @@ class PlatformManager:
raise ZoeException('Cannot create user gateway container')
user.gateway_docker_id = cont_info['docker_id']
user.set_gateway_urls(cont_info)
self.swarm.connect_to_network(user.gateway_docker_id, 'eeef9754c16790a29d5210c5d9ad8e66614ee8a6229b6dc6f779019d46cec792')
def kill_gateway_container(self, user):
self.swarm.terminate_container(user.gateway_docker_id, delete=True)
......@@ -171,7 +166,7 @@ class PlatformManager:
def create_user_network(self, user):
log.info('Creating a new network for user {}'.format(user.id))
net_name = '{}-usernet-{}'.format(get_conf().deployment_name, user.id)
net_name = '{}-{}-zoe'.format(user.name, get_conf().deployment_name)
net_id = self.swarm.network_create(net_name)
user.network_id = net_id
......@@ -194,8 +189,8 @@ class PlatformManager:
def check_state_swarm_consistency(self):
state_changed = False
users = self.state_manager.get('user')
networks = self.swarm.network_list('{}-usernet-'.format(get_conf().deployment_name))
gateways = self.swarm.list(['zoe.{}.gateway'.format(get_conf().deployment_name)])
networks = self.swarm.network_list('{}-zoe'.format(get_conf().deployment_name))
gateways = self.swarm.list({'zoe.type': 'gateway', 'zoe.deployment': get_conf().deployment_name})
users_no_network = []
users_no_gateway = []
......@@ -222,17 +217,17 @@ class PlatformManager:
duplicate_check = set()
for n in networks:
try:
uid = int(n['name'][len('{}-usernet-'.format(get_conf().deployment_name)):])
username = n['name'].split('-')[0]
except ValueError:
log.error('network {} does not belong to Zoe, bug?'.format(n['name']))
networks_to_delete.append(n['id'])
continue
if uid in duplicate_check:
if username in duplicate_check:
log.warning('state inconsistency: found two networks for the same user')
networks_to_delete.append(n['id'])
continue
duplicate_check.add(uid)
user = self.state_manager.get_one('user', id=uid)
duplicate_check.add(username)
user = self.state_manager.get_one('user', name=username)
if user is not None and user in users_no_network:
user.network_id = n['id']
users_no_network.remove(user)
......@@ -240,17 +235,17 @@ class PlatformManager:
state_changed = True
continue
elif user is None:
log.error('state inconsistency: found a network for user {} who no longer exists'.format(uid))
log.error('state inconsistency: found a network for user {} who no longer exists'.format(username))
networks_to_delete.append(n['id'])
for g in gateways:
try:
uid = int(g['name'][len('{}-gateway-'.format(get_conf().deployment_name)):])
username = g['name'].split('-')[1]
except ValueError:
log.error('container {} does not belong to Zoe, bug?'.format(g['name']))
gateways_to_delete.append(g['id'])
continue
user = self.state_manager.get_one('user', id=uid)
user = self.state_manager.get_one('user', name=username)
if user is not None and user in users_no_gateway:
user.gateway_docker_id = g['id']
users_no_gateway.remove(user)
......@@ -260,7 +255,7 @@ class PlatformManager:
state_changed = True
continue
elif user is None:
log.error('state inconsistency: found a gateway for user {} who no longer exists'.format(uid))
log.error('state inconsistency: found a gateway for user {} who no longer exists'.format(username))
gateways_to_delete.append(g['id'])
# Fix all inconsistencies found
......@@ -279,7 +274,7 @@ class PlatformManager:
self.start_gateway_container(u)
# ### Check executions and container consistency
swarm_containers = self.swarm.list(only_label='zoe.{}'.format(get_conf().deployment_name))
swarm_containers = self.swarm.list(only_label={'zoe.deployment': get_conf().deployment_name})
conts_state_to_delete = []
for c_id, c in self.state_manager.services.items():
if c.docker_id not in [x['id'] for x in swarm_containers]:
......
......@@ -63,9 +63,8 @@ class UserAPI(Resource):
is_authorized(calling_user, user, 'delete')
apps = self.state.get('application', owner=user)
if len(apps) > 0:
raise ZoeRestAPIException('User has {} applications defined, cannot delete'.format(len(apps)))
if self.state.user_has_active_executions(user_id):
raise ZoeRestAPIException('User has running executions, cannot delete')
self.platform.kill_gateway_container(user)
self.platform.remove_user_network(user)
......
......@@ -33,12 +33,13 @@ def guest_check_thread(args):
while True:
try:
zoe_containers = swarm.list('zoe.{}'.format(get_conf().deployment_name))
zoe_containers = swarm.list({'zoe.deployment_name': get_conf().deployment_name})
for c in zoe_containers:
if 'Exited' in c['status']:
zoe_name = c['labels']['zoe.service.name']
exec_name = c['labels']['zoe.execution.name']
try:
container_died(zoe_name)
container_died(zoe_name, exec_name)
except ZoeAPIException:
log.warning('Container ' + c['name'] + ' has died, but Zoe does not know anything about it, deleting')
swarm.terminate_container(c['id'], delete=True)
......
......@@ -22,12 +22,12 @@ def check_guests(swarm):
for guest in guests:
my_execs = [e for e in execs if e['owner'] == guest['name']]
for my_exec in my_execs:
if len(my_exec['containers']) == 0:
if len(my_exec['services']) == 0:
continue
my_exec_since_started = datetime.datetime.now() - dateutil.parser.parse(my_exec['time_started'])
my_exec_since_started = my_exec_since_started.total_seconds()
terminate = False
for c in my_exec['containers']:
for c in my_exec['services']:
c = cont_api.get(c)
for port in c['ports']:
if port['name'] == 'Spark application web interface':
......
......@@ -13,26 +13,26 @@ def main_callback(event):
return
try:
if event['Actor']['Attributes']['zoe.prefix'] != get_conf().deployment_name:
if event['Actor']['Attributes']['zoe.deployment_name'] != get_conf().deployment_name:
return
except KeyError:
return
if event['Action'] == "die":
try:
zoe_id = event['Actor']['Attributes']['zoe.container.id']
zoe_id = int(zoe_id)
container_died(zoe_id)
service_name = event['Actor']['Attributes']['zoe.service.name']
execution_name = event['Actor']['Attributes']['zoe.execution.name']
container_died(service_name, execution_name)
except KeyError:
return
def container_died(zoe_id: int):
def container_died(service_name, execution_name):
log.debug('A container died')
# tell the master via the rest api
cont_api = ZoeServiceAPI(get_conf().scheduler_url, 'zoeadmin', get_conf().zoeadmin_password)
cont_api = ZoeServiceAPI(get_conf().master_url, 'zoeadmin', get_conf().zoeadmin_password)
try:
cont_api.died(zoe_id)
cont_api.died(service_name, execution_name)
except ZoeAPIException as e:
if e.message != "No such service":
log.exception('Error reporting a dead service')
......@@ -70,9 +70,9 @@ def home_guest():
template_vars['gateway_ip'] = user['gateway_urls'][0].split('/')[2].split(':')[0]
exec_api = ZoeExecutionsAPI(get_conf().master_url, guest_identifier, guest_password)
app_descr = spark_jupyter_notebook_lab_app()
execution = query_api.query('execution', name='guest-lab-{}'.format(guest_identifier))
execution = query_api.query('execution', name='aml-lab')
if len(execution) == 0 or execution[0]['status'] == 'terminated' or execution[0]['status'] == 'finished':
exec_api.execution_start('guest-lab-{}'.format(guest_identifier), app_descr)
exec_api.execution_start('aml-lab', app_descr)
template_vars['execution_status'] = 'submitted'
return render_template('home_guest.html', **template_vars)
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment