[GITLAB] - UPGRADE TO v12 on Wednesday the 18th of December at 11.30AM

Commit 35caaf19 authored by Daniele Venzano's avatar Daniele Venzano

Refactor code

The database is no longer a singleton
The swarm manager is a singleton managed at package level
parent 51327582
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="BashSupportProjectSettings">
<option name="supportBash4" value="true" />
</component>
</project>
\ No newline at end of file
......@@ -6,6 +6,7 @@
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/caaas/templates" />
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectCodeStyleSettingsManager">
<option name="PER_PROJECT_SETTINGS">
<value />
</option>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default (1)" />
</component>
</project>
\ No newline at end of file
......@@ -3,6 +3,7 @@
<words>
<w>caaas</w>
<w>jinja</w>
<w>venza</w>
</words>
</dictionary>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/caaas/sql.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/images/notebook/files/start-notebook.sh" charset="UTF-8" />
<file url="PROJECT" charset="UTF-8" />
</component>
......
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="SpellCheckingInspection" enabled="true" level="TYPO" enabled_by_default="true">
<option name="processCode" value="false" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="PROJECT_PROFILE" value="Project Default" />
<option name="USE_PROJECT_PROFILE" value="true" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
This diff is collapsed.
from configparser import ConfigParser
from jinja2 import Template
from hashlib import md5
from os import system
from time import sleep
from urllib.parse import urlparse
from caaas import init_db, get_db
from caaas import CAaaState
OUTFILE = "/tmp/caaas-proxy.conf"
......@@ -27,14 +26,8 @@ ENTRY_TEMPLATE = """
"""
def read_config():
conf = ConfigParser()
conf.read('caaas.ini')
return conf
def get_proxy_entries():
db = get_db()
db = CAaaState()
return db.get_all_proxy()
......@@ -72,9 +65,6 @@ def commit_and_reload(generated_file):
if __name__ == "__main__":
conf = read_config()
init_db(conf['db']['user'], conf['db']['pass'], conf['db']['server'], conf['db']['db'])
print("CAaaS Apache proxy synchronizer starting")
while True:
......
[docker]
swarm-manager = 127.0.0.1:2380
swarm-manager = tcp://127.0.0.1:2380
[db]
user = caaas
......
from caaas.sql import CAaaSSQL
from caaas.sql import CAaaState
from caaas.cluster_description import SparkClusterDescription
from flask import Flask
app = Flask(__name__)
_db = None
from caaas.swarm_manager import SwarmManager
sm = SwarmManager()
sm.connect()
def init_db(user, passw, server, dbname):
global _db
_db = CAaaSSQL()
_db.connect(user, passw, server, dbname)
def get_db():
"""
:rtype: CAaaSSQL
"""
return _db
from caaas.swarm import swarm
import caaas.web
import caaas.api
from flask import jsonify
import time
from caaas import app, swarm, get_db
from caaas import app, sm, CAaaState
STATS_CACHING_EXPIRATION = 1 # seconds
@app.route("/api/status")
def api_status():
if time.time() - sm.last_update_timestamp > STATS_CACHING_EXPIRATION:
sm.update_status()
data = {
'num_containers': int(swarm.status.num_containers),
'num_nodes': int(swarm.status.num_nodes)
'num_containers': int(sm.status.num_containers),
'num_nodes': int(sm.status.num_nodes)
}
return jsonify(**data)
@app.route("/api/full-status")
def api_full_status():
db = get_db()
db = CAaaState()
data = {}
user_list = db.get_all_users()
for user_id, username in user_list:
......@@ -27,8 +32,8 @@ def api_full_status():
@app.route("/api/<username>/status")
def api_user_status(username):
db = get_db()
user_id = get_db().get_user_id(username)
db = CAaaState()
user_id = db.get_user_id(username)
cluster_list = db.get_clusters(user_id)
for clid in cluster_list:
cluster_list[clid]["is_notebook"] = cluster_list[clid]["name"] == "notebook"
......@@ -38,14 +43,14 @@ def api_user_status(username):
@app.route("/api/<username>/cluster/<cluster_id>/terminate")
def api_terminate_cluster(username, cluster_id):
db = get_db()
user_id = get_db().get_user_id(username)
db = CAaaState()
user_id = db.get_user_id(username)
cluster_list = db.get_clusters(user_id)
ret = {}
if cluster_list[cluster_id]["user_id"] != user_id:
ret["status"] = "unauthorized"
else:
if swarm.terminate_cluster(cluster_id):
if sm.terminate_cluster(cluster_id):
ret["status"] = "ok"
else:
ret["status"] = "error"
......
......@@ -3,3 +3,8 @@ class SparkClusterDescription:
self.num_workers = 2
self.executor_ram_size = "4g"
self.worker_cores = "2"
def for_spark_notebook(self):
self.num_workers = 2
self.worker_cores = "2"
self.executor_ram_size = "4g"
import mysql.connector
from hashlib import md5
import mysql.connector.errors
from utils.config import get_database_config
class CAaaSSQL:
class CAaaState:
def __init__(self):
self.cnx = None
def connect(self, user, passw, server, dbname):
assert self.cnx is None
self.cnx = mysql.connector.connect(user=user, password=passw, host=server, database=dbname, buffered=True)
def _reconnect(self):
if self.cnx is not None:
self.cnx.disconnect()
db_config = get_database_config()
self.cnx = mysql.connector.connect(**db_config)
def _get_cursor(self, dictionary=False):
try:
cursor = self.cnx.cursor(dictionary=dictionary)
except (mysql.connector.errors.OperationalError, AttributeError):
self._reconnect()
cursor = self.cnx.cursor(dictionary=dictionary)
return cursor
def _close_cursor(self, cursor):
self.cnx.commit()
cursor.close()
def _check_user(self, username):
cursor = self.cnx.cursor(dictionary=True)
cursor = self._get_cursor(dictionary=True)
q = "SELECT id FROM users WHERE username=%s"
cursor.execute(q, (username,))
if cursor.rowcount == 0:
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return self._create_user(username)
else:
row = cursor.fetchone()
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return row["id"]
def _create_user(self, username):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "INSERT INTO users (username) VALUES (%s)"
cursor.execute(q, (username,))
user_id = cursor.lastrowid
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return user_id
def get_user_id(self, username):
return self._check_user(username)
def get_all_users(self):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "SELECT id, username FROM users"
user_list = []
cursor.execute(q)
for row in cursor:
user_list.append(row)
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return user_list
def count_clusters(self, user_id=None):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
if user_id is None:
q = "SELECT COUNT(*) FROM clusters"
cursor.execute(q)
......@@ -58,12 +70,11 @@ class CAaaSSQL:
q = "SELECT COUNT(*) FROM clusters WHERE user_id=%s"
cursor.execute(q, (user_id,))
row = cursor.fetchone()
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return row[0]
def count_containers(self, user_id=None, cluster_id=None):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
if user_id is None and cluster_id is None:
q = "SELECT COUNT(*) FROM containers"
cursor.execute(q)
......@@ -78,12 +89,11 @@ class CAaaSSQL:
cursor.execute(q, (user_id, cluster_id))
row = cursor.fetchone()
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return row[0]
def get_notebook(self, user_id):
cursor = self.cnx.cursor(dictionary=True)
cursor = self._get_cursor(dictionary=True)
q = "SELECT * FROM notebooks WHERE user_id=%s"
cursor.execute(q, (user_id,))
if cursor.rowcount == 0:
......@@ -92,8 +102,7 @@ class CAaaSSQL:
return None
else:
row = cursor.fetchone()
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return row
def has_notebook(self, user_id):
......@@ -101,75 +110,67 @@ class CAaaSSQL:
return ret is not None
def get_url_proxy(self, proxy_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "SELECT url FROM proxy WHERE proxy_id=%s"
cursor.execute(q, (proxy_id,))
if cursor.rowcount == 0:
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return None
else:
row = cursor.fetchone()
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return row[0]
def get_all_proxy(self):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "SELECT proxy_id, url, proxy_type, container_id FROM proxy"
cursor.execute(q)
proxy_list = []
for proxy_id, url, proxy_type, container_id in cursor:
proxy_list.append((proxy_id, url, proxy_type, container_id))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return proxy_list
def new_cluster(self, user_id, name):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "INSERT INTO clusters (user_id, name) VALUES (%s, %s)"
cursor.execute(q, (user_id, name))
cluster_id = cursor.lastrowid
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return cluster_id
def set_master_address(self, cluster_id, address):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "UPDATE clusters SET master_address=%s WHERE clusters.id=%s"
print(address, cluster_id)
cursor.execute(q, (address, cluster_id))
self.cnx.commit()
self._close_cursor(cursor)
cursor.close()
def new_container(self, cluster_id, user_id, docker_id, ip_address, contents):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "INSERT INTO containers (user_id, cluster_id, docker_id, ip_address, contents) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(q, (user_id, cluster_id, docker_id, ip_address, contents))
cont_id = cursor.lastrowid
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return cont_id
def new_proxy_entry(self, proxy_id, cluster_id, address, proxy_type, container_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "INSERT INTO proxy (proxy_id, url, cluster_id, proxy_type, container_id) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(q, (proxy_id, address, cluster_id, proxy_type, container_id))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return proxy_id
def new_notebook(self, cluster_id, address, user_id, container_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "INSERT INTO notebooks (cluster_id, address, user_id, container_id) VALUES (%s, %s, %s, %s)"
cursor.execute(q, (cluster_id, address, user_id, container_id))
nb_id = cursor.lastrowid
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return nb_id
def get_clusters(self, user_id=None):
cursor = self.cnx.cursor(dictionary=True)
cursor = self._get_cursor(dictionary=True)
res = {}
if user_id is None:
q = "SELECT id, user_id, master_address, name FROM clusters"
......@@ -183,12 +184,11 @@ class CAaaSSQL:
"master_address": row["master_address"],
"name": row["name"]
}
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return res
def get_containers(self, user_id=None, cluster_id=None):
cursor = self.cnx.cursor(dictionary=True)
cursor = self._get_cursor(dictionary=True)
res = {}
if user_id is None and cluster_id is None:
q = "SELECT id, docker_id, cluster_id, user_id, ip_address, contents FROM containers"
......@@ -211,34 +211,29 @@ class CAaaSSQL:
"ip_address": row["ip_address"],
"contents": row["contents"],
}
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
return res
def remove_proxy(self, container_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "DELETE FROM proxy WHERE container_id=%s"
cursor.execute(q, (container_id,))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
def remove_notebook(self, container_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "DELETE FROM notebooks WHERE container_id=%s"
cursor.execute(q, (container_id,))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
def remove_container(self, container_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "DELETE FROM containers WHERE id=%s"
cursor.execute(q, (container_id,))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
def remove_cluster(self, cluster_id):
cursor = self.cnx.cursor()
cursor = self._get_cursor()
q = "DELETE FROM clusters WHERE id=%s"
cursor.execute(q, (cluster_id,))
self.cnx.commit()
cursor.close()
self._close_cursor(cursor)
from docker import Client
from docker import errors as docker_errors
from docker.utils import create_host_config
from threading import Thread
import time
from uuid import uuid4 as uuid
from caaas import get_db
from caaas.cluster_description import SparkClusterDescription
from caaas import CAaaState
from caaas import SparkClusterDescription
from utils import config
REGISTRY = "10.0.0.2:5000"
MASTER_IMAGE = REGISTRY + "/venza/spark-master:1.4.1"
......@@ -27,35 +26,25 @@ class SwarmStatus:
self.num_containers = 0
class Swarm:
class SwarmManager:
def __init__(self):
self.status = SwarmStatus()
self.cli = None
self.manager = None
def connect(self, swarm_manager):
self.manager = swarm_manager
self.cli = Client(base_url="tcp://" + self.manager)
def start_update_thread(self):
assert self.manager is not None
th = Thread(target=self._thread_cb)
th.start()
self.last_update_timestamp = 0
def _thread_cb(self):
print("Stats update thread started")
while True:
self.update_status()
time.sleep(1)
def connect(self):
manager = config.get_swarm_manager_address()
self.cli = Client(base_url=manager)
def update_status(self):
assert self.cli is not None
info = self.cli.info()
self.status.num_containers = info["Containers"]
self.status.num_nodes = info["DriverStatus"][3][1]
self.last_update_timestamp = time.time()
def get_notebook(self, user_id):
db = get_db()
db = CAaaState()
nb = db.get_notebook(user_id)
if nb is None:
self.start_cluster_with_notebook(user_id)
......@@ -64,13 +53,11 @@ class Swarm:
def start_cluster_with_notebook(self, user_id):
cluster_descr = SparkClusterDescription()
cluster_descr.num_workers = 2
cluster_descr.worker_cores = "2"
cluster_descr.executor_ram_size = "4g"
cluster_descr.for_spark_notebook()
self._create_new_spark_cluster(user_id, "notebook", cluster_descr, with_notebook=True)
def _create_new_spark_cluster(self, user_id, name, cluster_descr, with_notebook):
db = get_db()
db = CAaaState()
try:
cluster_id = db.new_cluster(user_id, name)
master_info = self._spawn_spark_master(cluster_id, user_id, cluster_descr)
......@@ -84,7 +71,7 @@ class Swarm:
# FIXME: should rollback all changes to DB
def _spawn_spark_master(self, cluster_id, user_id, cluster_descr):
db = get_db()
db = CAaaState()
options = {
"environment": {},
}
......@@ -95,7 +82,7 @@ class Swarm:
return info
def _spawn_spark_worker(self, cluster_id, user_id, cluster_descr, master_info):
db = get_db()
db = CAaaState()
options = {
"environment": {
"SPARK_MASTER_IP": master_info["docker_ip"],
......@@ -109,7 +96,7 @@ class Swarm:
return info
def _spawn_spark_notebook(self, cluster_id, user_id, cluster_descr, master_info):
db = get_db()
db = CAaaState()
proxy_id = get_uuid()
options = {
"environment": {
......@@ -140,7 +127,7 @@ class Swarm:
return info
def _terminate_container(self, container_id, docker_id, contents):
db = get_db()
db = CAaaState()
db.remove_proxy(container_id)
if contents == "spark-notebook":
db.remove_notebook(container_id)
......@@ -148,11 +135,10 @@ class Swarm:
db.remove_container(container_id)
def terminate_cluster(self, cluster_id):
db = get_db()
db = CAaaState()
cont_list = db.get_containers(cluster_id=cluster_id)
for cid, cinfo in cont_list.items():
self._terminate_container(cid, cinfo["docker_id"], cinfo["contents"])
db.remove_cluster(cluster_id)
return True
swarm = Swarm()
from flask import render_template
from caaas import app, swarm, get_db
from caaas import app, sm, CAaaState
@app.route("/web/")
......@@ -10,6 +10,8 @@ def index():
@app.route("/web/<username>")
def web_index(username):
state = CAaaState()
state.get_user_id(username) # creates the user if it does not exists
template_vars = {
"user": username
}
......@@ -18,7 +20,6 @@ def web_index(username):
@app.route("/web/<username>/status")
def web_user_status(username):
user_id = get_db().get_user_id(username)
template_vars = {
"user": username
}
......@@ -27,10 +28,11 @@ def web_user_status(username):
@app.route("/web/<username>/spark-notebook")
def web_notebook(username):
user_id = get_db().get_user_id(username)
state = CAaaState()
user_id = state.get_user_id(username)
template_vars = {
"user": username,
"notebook_address": swarm.get_notebook(user_id)
"notebook_address": sm.get_notebook(user_id)
}
return render_template('notebook.html', **template_vars)
......
from configparser import ConfigParser
from caaas import swarm, app, init_db
def read_config():
conf = ConfigParser()
conf.read('caaas.ini')
return conf
from caaas import app
def main():
conf = read_config()
init_db(conf['db']['user'], conf['db']['pass'], conf['db']['server'], conf['db']['db'])
swarm.connect(conf['docker']['swarm-manager'])
swarm.start_update_thread()
app.debug = True
app.run(host="0.0.0.0")
......
__author__ = 'venzano'
from configparser import ConfigParser
import os
MAIN_PATH = os.path.split(os.path.abspath(os.path.join(__file__, "..")))[0]
conf = ConfigParser()
conf.read(os.path.join(MAIN_PATH, 'caaas.ini'))
def get_swarm_manager_address():
return conf['docker']['swarm-manager']
def get_database_config():
db_config = {
'user': conf['db']['user'],
'password': conf['db']['pass'],
'host': conf['db']['server'],
'database': conf['db']['db'],
'buffered': True
}
return db_config
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment