Commit a47a16f6 authored by hxquangnhat's avatar hxquangnhat
Browse files

fixed issue#54

parent 2ef53fc7
...@@ -16,3 +16,4 @@ Developer documentation ...@@ -16,3 +16,4 @@ Developer documentation
scheduler scheduler
backend backend
stats stats
kube-backend
.. _kube_backend: .. _kube-backend:
Kubernetes backend for Zoe Kubernetes backend for Zoe
========================== ==========================
...@@ -23,11 +23,11 @@ How it works ...@@ -23,11 +23,11 @@ How it works
2. Zoe: 2. Zoe:
* Zapp Description: * Zapp Description:
* Add new field: ``replicas``, if users doesn't specify this value, the default value for each service would be ``1``. * Add new field: ``replicas``, if users doesn't specify this value, the default value for each service would be ``1``.
* In field ``require_resources``, the ``cores`` field can be float. * In field ``require_resources``, the ``cores`` field can be float.
* Idea: * Idea:
* Create each **replication controller** per each service of a Zapp. Replication Controller assures to have at least a number of **replicas** (pod) always running. * Create each **replication controller** per each service of a Zapp. Replication Controller assures to have at least a number of **replicas** (pod) always running.
* Create a Kubernetes **service** per each **replication controller**, which has the same **labels** and **label selectors** with the associated **replication controller**. The service would help the zapp service be exposed to the network by exposing the same port of the service on all kubernetes nodes. * Create a Kubernetes **service** per each **replication controller**, which has the same **labels** and **label selectors** with the associated **replication controller**. The service would help the zapp service be exposed to the network by exposing the same port of the service on all kubernetes nodes.
...@@ -37,5 +37,5 @@ References ...@@ -37,5 +37,5 @@ References
* Kubernetes: https://kubernetes.io/ * Kubernetes: https://kubernetes.io/
* Kubernetes Replication Controller : https://kubernetes.io/docs/user-guide/replication-controller/ * Kubernetes Replication Controller : https://kubernetes.io/docs/user-guide/replication-controller/
* Kubernetes Service: https://kubernetes.io/docs/user-guide/services/ * Kubernetes Service: https://kubernetes.io/docs/user-guide/services/
* Kubernetes Limit and Request: https://kubernetes.io/docs/user-guide/compute-resources/ * Kubernetes Limit and Request: https://kubernetes.io/docs/user-guide/compute-resources/
#!/usr/bin/env bash #!/usr/bin/env bash
pylint *.py zoe_* set -e
doc8 docs/
pylint --ignore old_swarm *.py zoe_*
doc8 docs/
...@@ -21,14 +21,13 @@ import threading ...@@ -21,14 +21,13 @@ import threading
import zoe_api.exceptions import zoe_api.exceptions
import zoe_api.master_api import zoe_api.master_api
from zoe_api.proxy.apache import ApacheProxy
#from zoe_api.proxy.nginx import NginxProxy
import zoe_lib.applications import zoe_lib.applications
import zoe_lib.exceptions import zoe_lib.exceptions
import zoe_lib.state import zoe_lib.state
from zoe_lib.config import get_conf from zoe_lib.config import get_conf
from zoe_api.proxy.apache import ApacheProxy
from zoe_api.proxy.nginx import NginxProxy
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -78,10 +77,10 @@ class APIEndpoint: ...@@ -78,10 +77,10 @@ class APIEndpoint:
if get_conf().deployment_name != 'test': if get_conf().deployment_name != 'test':
if get_conf().proxy_type == 'apache': if get_conf().proxy_type == 'apache':
proxy = zoe_api.proxy.apache.ApacheProxy(self) proxy = ApacheProxy(self)
else: #else:
proxy = zoe_api.proxy.nginx.NginxProxy(self) # proxy = NginxProxy(self)
threading.Thread(target=proxy.proxify,args=(uid, role, new_id)).start() threading.Thread(target=proxy.proxify, args=(uid, role, new_id)).start()
return new_id return new_id
...@@ -98,9 +97,9 @@ class APIEndpoint: ...@@ -98,9 +97,9 @@ class APIEndpoint:
if e.is_active: if e.is_active:
if get_conf().deployment_name != 'test': if get_conf().deployment_name != 'test':
if get_conf().proxy_type == 'apache': if get_conf().proxy_type == 'apache':
proxy = zoe_api.proxy.apache.ApacheProxy(self) proxy = ApacheProxy(self)
else: #else:
proxy = zoe_api.proxy.nginx.NginxProxy(self) # proxy = NginxProxy(self)
proxy.unproxify(uid, role, exec_id) proxy.unproxify(uid, role, exec_id)
return self.master.execution_terminate(exec_id) return self.master.execution_terminate(exec_id)
else: else:
......
...@@ -15,17 +15,18 @@ ...@@ -15,17 +15,18 @@
""" Store adapters to read/write data to from/to PostgresSQL. """ """ Store adapters to read/write data to from/to PostgresSQL. """
import datetime, time
import zoe_lib.state import zoe_lib.state
from zoe_lib.config import get_conf from zoe_lib.config import get_conf
from oauth2.store import AccessTokenStore, AuthCodeStore, ClientStore from oauth2.store import AccessTokenStore, ClientStore
from oauth2.datatype import AccessToken, AuthorizationCode, Client from oauth2.datatype import AccessToken, Client
from oauth2.error import AccessTokenNotFound, AuthCodeNotFound, ClientNotFoundError from oauth2.error import AccessTokenNotFound, ClientNotFoundError
class AccessTokenStore(AccessTokenStore): class AccessTokenStorePg(AccessTokenStore):
""" AccessTokenStore for postgresql """
def fetch_by_refresh_token(self, refresh_token): def fetch_by_refresh_token(self, refresh_token):
""" get accesstoken from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_by_refresh_token(refresh_token) data = sql.fetch_by_refresh_token(refresh_token)
...@@ -51,18 +52,21 @@ class AccessTokenStore(AccessTokenStore): ...@@ -51,18 +52,21 @@ class AccessTokenStore(AccessTokenStore):
return res return res
def get_client_id_by_refresh_token(self, refresh_token): def get_client_id_by_refresh_token(self, refresh_token):
""" get clientID from refreshtoken """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_refresh_token(refresh_token) data = sql.get_client_id_by_refresh_token(refresh_token)
return data return data
def get_client_id_by_access_token(self, access_token): def get_client_id_by_access_token(self, access_token):
""" get clientID from accesstoken """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
data = sql.get_client_id_by_access_token(access_token) data = sql.get_client_id_by_access_token(access_token)
return data return data
def fetch_existing_token_of_user(self, client_id, grant_type, user_id): def fetch_existing_token_of_user(self, client_id, grant_type, user_id):
""" get accesstoken from userid """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
data = sql.fetch_existing_token_of_user(client_id, grant_type, user_id) data = sql.fetch_existing_token_of_user(client_id, grant_type, user_id)
...@@ -80,23 +84,26 @@ class AccessTokenStore(AccessTokenStore): ...@@ -80,23 +84,26 @@ class AccessTokenStore(AccessTokenStore):
user_id=data["user_id"]) user_id=data["user_id"])
def save_token(self, access_token): def save_token(self, access_token):
""" save accesstoken """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
sql.save_token(access_token.client_id, sql.save_token(access_token.client_id,
access_token.grant_type, access_token.grant_type,
access_token.token, access_token.token,
access_token.data, access_token.data,
access_token.expires_at, access_token.expires_at,
access_token.refresh_token, access_token.refresh_token,
access_token.refresh_expires_at, access_token.refresh_expires_at,
access_token.scopes, access_token.scopes,
access_token.user_id) access_token.user_id)
return True return True
class ClientStore(ClientStore): class ClientStorePg(ClientStore):
""" ClientStore for postgres """
def save_client(self, identifier, secret, role, redirect_uris, authorized_grants, authorized_response_types): def save_client(self, identifier, secret, role, redirect_uris, authorized_grants, authorized_response_types):
""" save client to db """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
sql.save_client(identifier, sql.save_client(identifier,
secret, secret,
...@@ -107,10 +114,11 @@ class ClientStore(ClientStore): ...@@ -107,10 +114,11 @@ class ClientStore(ClientStore):
return True return True
def fetch_by_client_id(self, client_id): def fetch_by_client_id(self, client_id):
""" get client by clientid """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id) client_data = sql.fetch_by_client_id(client_id)
client_data_grants= client_data["authorized_grants"].split(':') client_data_grants = client_data["authorized_grants"].split(':')
if client_data is None: if client_data is None:
raise ClientNotFoundError raise ClientNotFoundError
...@@ -122,6 +130,7 @@ class ClientStore(ClientStore): ...@@ -122,6 +130,7 @@ class ClientStore(ClientStore):
authorized_response_types=client_data["authorized_response_types"]) authorized_response_types=client_data["authorized_response_types"])
def get_role_by_client_id(self, client_id): def get_role_by_client_id(self, client_id):
""" get client role by clientid """
sql = zoe_lib.state.SQLManager(get_conf()) sql = zoe_lib.state.SQLManager(get_conf())
client_data = sql.fetch_by_client_id(client_id) client_data = sql.fetch_by_client_id(client_id)
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
""" Token generator for oauth2.""" """ Token generator for oauth2."""
import base64
import hashlib import hashlib
import os import os
import uuid import uuid
...@@ -64,7 +63,6 @@ class TokenGenerator(object): ...@@ -64,7 +63,6 @@ class TokenGenerator(object):
""" """
raise NotImplementedError raise NotImplementedError
class URandomTokenGenerator(TokenGenerator): class URandomTokenGenerator(TokenGenerator):
""" """
Create a token using ``os.urandom()``. Create a token using ``os.urandom()``.
...@@ -85,7 +83,6 @@ class URandomTokenGenerator(TokenGenerator): ...@@ -85,7 +83,6 @@ class URandomTokenGenerator(TokenGenerator):
return hash_gen.hexdigest()[:self.token_length] return hash_gen.hexdigest()[:self.token_length]
class Uuid4(TokenGenerator): class Uuid4(TokenGenerator):
""" """
Generate a token using uuid4. Generate a token using uuid4.
...@@ -96,4 +93,3 @@ class Uuid4(TokenGenerator): ...@@ -96,4 +93,3 @@ class Uuid4(TokenGenerator):
:rtype: str :rtype: str
""" """
return str(uuid.uuid4()) return str(uuid.uuid4())
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
"""Proxifying using Apache2 Container.""" """Proxifying using Apache2 Container."""
import docker
import time import time
import logging import logging
import random import random
import docker
import zoe_api.proxy.base import zoe_api.proxy.base
import zoe_api.api_endpoint import zoe_api.api_endpoint
...@@ -33,78 +33,77 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy): ...@@ -33,78 +33,77 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy):
def __init__(self, apiEndpoint): def __init__(self, apiEndpoint):
self.api_endpoint = apiEndpoint self.api_endpoint = apiEndpoint
"""Proxify function.""" def proxify(self, uid, role, execution_id): #pylint: disable=too-many-locals
def proxify(self, uid, role, id): """Proxify function."""
try: try:
length_service = 0
#Wait until all the services get created and started to be able to get the backend_id #Wait until all the services get created and started to be able to get the backend_id
while self.api_endpoint.execution_by_id(uid, role, id).status != 'running': while self.api_endpoint.execution_by_id(uid, role, execution_id).status != 'running':
log.info('Waiting for all services get started...') log.info('Waiting for all services get started...')
length_service = len(self.api_endpoint.execution_by_id(uid, role, id).services)
time.sleep(1) time.sleep(1)
exe = self.api_endpoint.execution_by_id(uid, role, id) exe = self.api_endpoint.execution_by_id(uid, role, execution_id)
l = len(exe.services) lth = len(exe.services)
while l != 0: while lth != 0:
exe = self.api_endpoint.execution_by_id(uid, role, id) exe = self.api_endpoint.execution_by_id(uid, role, execution_id)
l = len(exe.services) lth = len(exe.services)
for srv in exe.services: for srv in exe.services:
if srv.backend_id == None: if srv.backend_id is None:
time.sleep(1) time.sleep(1)
else: else:
l = l - 1 lth = lth - 1
#Start proxifying by adding entry to use proxypass and proxypassreverse in apache2 config file #Start proxifying by adding entry to use proxypass and proxypassreverse in apache2 config file
for srv in exe.services: for srv in exe.services:
ip, p = None, None ip, port = None, None
if get_conf().backend == 'OldSwarm': if get_conf().backend == 'OldSwarm':
swarm = SwarmClient(get_conf()) swarm = SwarmClient(get_conf())
s_info = swarm.inspect_container(srv.backend_id) s_info = swarm.inspect_container(srv.backend_id)
portList = s_info['ports'] port_list = s_info['ports']
for k,v in portList.items(): for key, val in port_list.items():
exposedPort = k.split('/tcp')[0] exposed_port = key.split('/tcp')[0]
if v != None: if val != None:
ip = v[0] ip = val[0]
p = v[1] port = val[1]
base_path = '/zoe/' + uid + '/' + str(id) + '/' + srv.name + '/' + exposedPort base_path = '/zoe/' + uid + '/' + str(execution_id) + '/' + srv.name + '/' + exposed_port
original_path = str(ip) + ':' + str(p) + base_path original_path = str(ip) + ':' + str(port) + base_path
if ip is not None and p is not None: if ip is not None and port is not None:
log.info('Proxifying %s', srv.name + ' port ' + exposedPort) log.info('Proxifying %s', srv.name + ' port ' + exposed_port)
self.dispatch_to_docker(base_path, original_path) self.dispatch_to_docker(base_path, original_path)
else: else:
kube = KubernetesClient(get_conf()) kube = KubernetesClient(get_conf())
s_info = kube.inspect_service(srv.dns_name) s_info = kube.inspect_service(srv.dns_name)
kubeNodes = kube.info().nodes kube_nodes = kube.info().nodes
hostIP = random.choice(kubeNodes).name host_ip = random.choice(kube_nodes).name
while 'nodePort' not in s_info['port_forwarding'][0]: while 'nodePort' not in s_info['port_forwarding'][0]:
log.info('Waiting for service get started before proxifying...') log.info('Waiting for service get started before proxifying...')
s_info = kube.inspect_service(srv.dns_name) s_info = kube.inspect_service(srv.dns_name)
time.sleep(0.5) time.sleep(0.5)
ip = hostIP ip = host_ip
p = s_info['port_forwarding'][0]['nodePort'] port = s_info['port_forwarding'][0]['nodePort']
exposedPort = s_info['port_forwarding'][0]['port'] exposed_port = s_info['port_forwarding'][0]['port']
base_path = '/zoe/' + uid + '/' + str(id) + '/' + srv.name + '/' + str(exposedPort) base_path = '/zoe/' + uid + '/' + str(execution_id) + '/' + srv.name + '/' + str(exposed_port)
original_path = str(ip) + ':' + str(p) + base_path original_path = str(ip) + ':' + str(port) + base_path
if ip is not None and p is not None: if ip is not None and port is not None:
log.info('Proxifying %s', srv.name + ' port ' + str(exposedPort)) log.info('Proxifying %s', srv.name + ' port ' + str(exposed_port))
self.dispatch_to_docker(base_path, original_path) self.dispatch_to_docker(base_path, original_path)
except Exception as ex: except Exception as ex:
log.error(ex) log.error(ex)
#The apache2 server is running inside a container
#Adding new entries with the proxy path and the ip:port of the application to the apache2 config file
def dispatch_to_docker(self, base_path, original_path): def dispatch_to_docker(self, base_path, original_path):
"""
The apache2 server is running inside a container
Adding new entries with the proxy path and the ip:port of the application to the apache2 config file
"""
proxy = ['ProxyPass ' + base_path + '/api/kernels/ ws://' + original_path + '/api/kernels/', proxy = ['ProxyPass ' + base_path + '/api/kernels/ ws://' + original_path + '/api/kernels/',
'ProxyPassReverse ' + base_path + '/api/kernels/ ws://' + original_path + '/api/kernels/', 'ProxyPassReverse ' + base_path + '/api/kernels/ ws://' + original_path + '/api/kernels/',
'ProxyPass ' + base_path + '/terminals/websocket/ ws://' + original_path + '/terminals/websocket/', 'ProxyPass ' + base_path + '/terminals/websocket/ ws://' + original_path + '/terminals/websocket/',
...@@ -116,24 +115,24 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy): ...@@ -116,24 +115,24 @@ class ApacheProxy(zoe_api.proxy.base.BaseProxy):
docker_client = docker.Client(base_url=get_conf().proxy_docker_sock) docker_client = docker.Client(base_url=get_conf().proxy_docker_sock)
delCommand = "sed -i '$ d' " + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf" del_command = "sed -i '$ d' " + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf"
delID = docker_client.exec_create(get_conf().proxy_container, delCommand) del_id = docker_client.exec_create(get_conf().proxy_container, del_command)
docker_client.exec_start(delID) docker_client.exec_start(del_id)
for s in proxy: for entry in proxy:
command = 'bash -c "echo ' + "'" + s + "'" + ' >> /etc/apache2/sites-available/all.conf"' command = 'bash -c "echo ' + "'" + entry + "'" + ' >> /etc/apache2/sites-available/all.conf"'
id = docker_client.exec_create(get_conf().proxy_container, command) execution_id = docker_client.exec_create(get_conf().proxy_container, command)
docker_client.exec_start(id) docker_client.exec_start(execution_id)
reloadCommand = 'service apache2 reload' reload_command = 'service apache2 reload'
reloadID = docker_client.exec_create(get_conf().proxy_container, reloadCommand) reload_id = docker_client.exec_create(get_conf().proxy_container, reload_command)
docker_client.exec_start(reloadID) docker_client.exec_start(reload_id)
#Simply remove the added entries at the apache2 config file when terminating applcations #Simply remove the added entries at the apache2 config file when terminating applcations
def unproxify(self, uid, role, id): def unproxify(self, uid, role, execution_id):
log.info('Unproxifying for user %s - execution %s', uid, str(id)) log.info('Unproxifying for user %s - execution %s', uid, str(execution_id))
pattern = '/zoe\/' + uid + '\/' + str(id) + '/d' pattern = '/zoe\/' + uid + '\/' + str(execution_id) + '/d' #pylint: disable=anomalous-backslash-in-string
docker_client = docker.Client(base_url=get_conf().proxy_docker_sock) docker_client = docker.Client(base_url=get_conf().proxy_docker_sock)
delCommand = 'sed -i "' + pattern + '" ' + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf' del_command = 'sed -i "' + pattern + '" ' + get_conf().proxy_config_file # /etc/apache2/sites-available/all.conf'
delID = docker_client.exec_create(get_conf().proxy_container, delCommand) del_id = docker_client.exec_create(get_conf().proxy_container, del_command)
docker_client.exec_start(delID) docker_client.exec_start(del_id)
...@@ -19,9 +19,10 @@ ...@@ -19,9 +19,10 @@
class BaseProxy: class BaseProxy:
"""Base proxy class.""" """Base proxy class."""
def proxify(self, uid, role, id): def proxify(self, uid, role, execution_id):
"""The methods that needs to be overridden by implementations.""" """The methods that needs to be overridden by implementations."""
raise NotImplementedError raise NotImplementedError
def unproxify(self, uid, role, id): def unproxify(self, uid, role, execution_id):
"""The methods that needs to be overridden by implementations."""
raise NotImplementedError raise NotImplementedError
# Copyright (c) 2016, Quang-Nhat Hoang-Xuan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base authenticator class."""
import docker
import time
import logging
import zoe_api.proxy.base
import zoe_api.api_endpoint
from zoe_lib.config import get_conf
log = logging.getLogger(__name__)
class NginxProxy(zoe_api.proxy.base.BaseProxy):
"""Nginx proxy class."""
def __init__(self, apiEndpoint):
return {}
def proxify(self, uid, role, id):
return {}
def unproxify(self, uid, role, id):
return {}
...@@ -34,7 +34,7 @@ class ExecutionAPI(RequestHandler): ...@@ -34,7 +34,7 @@ class ExecutionAPI(RequestHandler):
"""Set up the headers for enabling CORS.""" """Set up the headers for enabling CORS."""
manage_cors_headers(self) manage_cors_headers(self)
def options(self, execution_id): def options(self, execution_id): # pylint: disable=unused-argument
"""Needed for CORS.""" """Needed for CORS."""
self.set_status(204) self.set_status(204)
self.finish() self.finish()
...@@ -121,8 +121,8 @@ class ExecutionCollectionAPI(RequestHandler): ...@@ -121,8 +121,8 @@ class ExecutionCollectionAPI(RequestHandler):
if self.request.body: if self.request.body:
filt_dict = tornado.escape.json_decode(self.request.body) filt_dict = tornado.escape.json_decode(self.request.body)
except ValueError: except ValueError:
raise zoe_api.exceptions.ZoeRestAPIExecution('Error decoding JSON data') raise zoe_api.exceptions.ZoeRestAPIException('Error decoding JSON data')
if 'status' in filt_dict: if 'status' in filt_dict:
execs = self.api_endpoint.execution_list(uid, role, status=filt_dict['status'])