Commit bcf55503 authored by Daniele Venzano's avatar Daniele Venzano

Remove the Observer component, for now

parent 3f69ab39
#!/usr/bin/python3
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zoe_observer.entrypoint import main
if __name__ == '__main__':
main()
# Copyright (c) 2015, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zoe_lib.configargparse import ArgumentParser, Namespace
from zoe_lib.info import ZoeInfoAPI
config_paths = [
'zoe-observer.conf',
'/etc/zoe/zoe-observer.conf'
]
_conf = None
def load_configuration(test_conf=None):
global _conf
if test_conf is None:
argparser = ArgumentParser(description="Zoe Observer - Container Analytics as a Service Swarm Observer component",
default_config_files=config_paths,
auto_env_var_prefix="ZOE_OBSERVER_",
args_for_setting_config_path=["--config"],
args_for_writing_out_config_file=["--write-config"])
argparser.add_argument('--debug', action='store_true', help='Enable debug output')
argparser.add_argument('--swarm', help='Swarm/Docker API endpoint (ex.: zk://zk1:2181,zk2:2181 or http://swarm:2380)', default='http://localhost:2375')
argparser.add_argument('--master-url', help='URL of the master\'s REST API', default='http://127.0.0.1:4850')
argparser.add_argument('--zoeadmin-password', help='Password used to login as the master Zoe administrator', default='changeme')
argparser.add_argument('--spark-activity-timeout', help='Number of seconds of inactivity (no jobs run) before a Spark cluster is terminated', type=int, default=18000)
argparser.add_argument('--loop-time', help='Time between consecutive check', type=int, default=300)
opts = argparser.parse_args()
if opts.debug:
argparser.print_values()
_conf = opts
info_api = ZoeInfoAPI(opts.master_url, 'zoeadmin', opts.zoeadmin_password)
info = info_api.info()
opts.deployment_name = info['deployment_name']
# FIXME: check compatibility for API versions
else:
_conf = test_conf
def get_conf() -> Namespace:
return _conf
#!/usr/bin/python3
# Copyright (c) 2016, Daniele Venzano
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import threading
from zoe_lib.swarm_client import SwarmClient
from zoe_observer.config import load_configuration, get_conf
from zoe_observer.swarm_event_manager import container_died, main_callback
from zoe_observer.guest_inactivity import check_guests
from zoe_lib.exceptions import ZoeAPIException
log = logging.getLogger("main")
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(name)s (%(threadName)s): %(message)s'
def guest_check_thread(args):
swarm = SwarmClient(args)
while True:
try:
zoe_containers = swarm.list({'zoe.deployment_name': get_conf().deployment_name})
for c in zoe_containers:
if 'Exited' in c['status']:
zoe_id = c['labels']['zoe.service.id']
try:
container_died(zoe_id)
except ZoeAPIException:
log.warning('Container ' + c['name'] + ' has died, but Zoe does not know anything about it, deleting')
swarm.terminate_container(c['id'], delete=True)
check_guests(swarm)
time.sleep(get_conf().loop_time)
except Exception:
log.exception('Something bad happened')
def swarm_events_thread(args):
swarm = SwarmClient(args)
while True:
try:
swarm.event_listener(main_callback)
except Exception:
log.exception('Something bad happened')
def main():
"""
The entrypoint for the zoe-observer script.
:return: int
"""
load_configuration()
args = get_conf()
if args.debug:
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
else:
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logging.getLogger('kazoo').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('docker').setLevel(logging.INFO)
th = threading.Thread(target=guest_check_thread, name="guest-check", args=[args], daemon=True)
th.start()
swarm_events_thread(args)
import logging
import json
import datetime
import dateutil.parser
from zoe_lib.query import ZoeQueryAPI
from zoe_lib.executions import ZoeExecutionsAPI
from zoe_lib.services import ZoeServiceAPI
from zoe_observer.config import get_conf
log = logging.getLogger(__name__)
def check_guests(swarm):
query_api = ZoeQueryAPI(get_conf().master_url, 'zoeadmin', get_conf().zoeadmin_password)
exec_api = ZoeExecutionsAPI(get_conf().master_url, 'zoeadmin', get_conf().zoeadmin_password)
cont_api = ZoeServiceAPI(get_conf().master_url, 'zoeadmin', get_conf().zoeadmin_password)
guests = query_api.query('user', role='guest')
execs = exec_api.list()
for guest in guests:
my_execs = [e for e in execs if e['owner'] == guest['name']]
for my_exec in my_execs:
if len(my_exec['services']) == 0:
continue
my_exec_since_started = datetime.datetime.now() - dateutil.parser.parse(my_exec['time_started'])
my_exec_since_started = my_exec_since_started.total_seconds()
terminate = False
for c in my_exec['services']:
c = cont_api.get(c)
for port in c['ports']:
if port['name'] == 'Spark application web interface':
idle_time = check_spark_job(swarm, c['docker_id'], my_exec_since_started)
if check_if_kill(idle_time):
log.info('Execution {} for user {} has been idle for too long, terminating...'.format(my_exec['name'], guest['name']))
terminate = True
break
else:
log.debug('Execution {} for user {} has been idle for {} seconds'.format(my_exec['name'], guest['name'], idle_time))
if terminate:
break
if terminate:
exec_api.terminate(my_exec['id'])
def check_if_kill(idle_seconds):
if idle_seconds > get_conf().spark_activity_timeout:
return True
else:
return False
def check_spark_job(swarm, docker_id, time_started):
swarm_exec = swarm.cli.exec_create(docker_id, 'curl http://localhost:4040/api/v1/applications/pyspark-shell/jobs', stderr=False)
output = swarm.cli.exec_start(swarm_exec['Id'])
try:
output = json.loads(output.decode('utf-8'))
except ValueError:
return time_started
if len(output) == 0:
return time_started
seconds_since_last_job = None
for job in output:
if 'submissionTime' not in job:
continue
job_time = dateutil.parser.parse(job['submissionTime'])
job_time_diff = datetime.datetime.now(datetime.timezone.utc) - job_time
if seconds_since_last_job is None or job_time_diff < seconds_since_last_job:
seconds_since_last_job = job_time_diff
if seconds_since_last_job is None:
return time_started
else:
return seconds_since_last_job.total_seconds()
import logging
from zoe_observer.config import get_conf
from zoe_lib.services import ZoeServiceAPI
from zoe_lib.exceptions import ZoeAPIException
log = logging.getLogger(__name__)
def main_callback(event):
if event['Type'] != 'container':
return
try:
if event['Actor']['Attributes']['zoe.deployment_name'] != get_conf().deployment_name:
return
except KeyError:
return
log.debug(event)
if event['Action'] == "die":
try:
service_id = event['Actor']['Attributes']['zoe.service.id']
container_died(service_id)
except KeyError:
return
def container_died(service_id):
log.debug('A container died')
# tell the master via the rest api
cont_api = ZoeServiceAPI(get_conf().master_url, 'zoeadmin', get_conf().zoeadmin_password)
try:
cont_api.died(service_id)
except ZoeAPIException as e:
if e.message != "No such service":
log.exception('Error reporting a dead service')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment