This commit is contained in:
Jadowyne Ulve 2025-08-03 19:00:37 -05:00
commit 7c5c11a9aa
250 changed files with 9274 additions and 10320 deletions

54
.gitignore vendored
View File

@ -3,56 +3,4 @@ sites
static/css/uikit-rtl.css
static/css/uikit-rtl.min.css
static/css/uikit.css
static/css/uikit.min.css
__pycache__/api_admin.cpython-312.pyc
__pycache__/api_admin.cpython-313.pyc
__pycache__/api.cpython-312.pyc
__pycache__/api.cpython-313.pyc
__pycache__/config.cpython-312.pyc
__pycache__/config.cpython-313.pyc
__pycache__/database_admin.cpython-312.pyc
__pycache__/database_admin.cpython-313.pyc
__pycache__/database.cpython-312.pyc
__pycache__/database.cpython-313.pyc
__pycache__/external_API.cpython-312.pyc
__pycache__/external_API.cpython-313.pyc
__pycache__/group_api.cpython-312.pyc
__pycache__/group_api.cpython-313.pyc
__pycache__/main.cpython-312.pyc
__pycache__/main.cpython-313.pyc
__pycache__/manage.cpython-312.pyc
__pycache__/manage.cpython-313.pyc
__pycache__/MyDataclasses.cpython-312.pyc
__pycache__/MyDataclasses.cpython-313.pyc
__pycache__/postsqldb.cpython-312.pyc
__pycache__/postsqldb.cpython-313.pyc
__pycache__/process.cpython-312.pyc
__pycache__/process.cpython-313.pyc
__pycache__/receipts_API.cpython-312.pyc
__pycache__/receipts_API.cpython-313.pyc
__pycache__/shopping_list_API.cpython-312.pyc
__pycache__/shopping_list_API.cpython-313.pyc
__pycache__/user_api.cpython-312.pyc
__pycache__/user_api.cpython-313.pyc
__pycache__/webpush.cpython-312.pyc
__pycache__/webpush.cpython-313.pyc
__pycache__/workshop_api.cpython-312.pyc
__pycache__/workshop_api.cpython-313.pyc
application/recipes/__pycache__/__init__.cpython-312.pyc
application/recipes/__pycache__/__init__.cpython-313.pyc
application/recipes/__pycache__/database_recipes.cpython-312.pyc
application/recipes/__pycache__/database_recipes.cpython-313.pyc
application/recipes/__pycache__/recipes_api.cpython-312.pyc
application/recipes/__pycache__/recipes_api.cpython-313.pyc
application/__pycache__/__init__.cpython-312.pyc
application/__pycache__/__init__.cpython-313.pyc
application/__pycache__/postsqldb.cpython-312.pyc
application/__pycache__/postsqldb.cpython-313.pyc
application/items/__pycache__/__init__.cpython-312.pyc
application/items/__pycache__/__init__.cpython-313.pyc
application/items/__pycache__/database_items.cpython-312.pyc
application/items/__pycache__/database_items.cpython-313.pyc
application/items/__pycache__/items_API.cpython-312.pyc
application/items/__pycache__/items_API.cpython-313.pyc
application/items/__pycache__/items_processes.cpython-312.pyc
application/items/__pycache__/items_processes.cpython-313.pyc
static/css/uikit.min.css

View File

@ -358,6 +358,10 @@ class SitePayload:
self.default_auto_issue_location,
self.default_primary_location
)
def get_dictionary(self):
return self.__dict__
#DONE
@dataclass

View File

@ -1,5 +1,17 @@
## postgresql and python learning
## PantryTrack
I am attempting to understand how to connect and execute commands through python against a remote postgresql to update my systems
This is currently a passion project that I have started to learn and develop a few different things:
- Barcode Scanners and how they work
- Posgresql Databases
- SQL, JAVASCRIPT, CSS, HTML
- Database Schema management and design
This is a test
PantryTrack is am inventory system that you add items to, use those items to build recipes and shopping lists.
You then can manually create receipts and or use a barcode scanner to set up a kiosk to scan in all the things you
purchase into a receipt.
You then edit and receive that receipt into the system.
There is also the ability to use a kiosk like interface to set up a scan in and out system, where as you use things
the system will remove those items by scanning them.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

370
api.py
View File

@ -1,370 +0,0 @@
from flask import Blueprint, request, render_template, redirect, session, url_for, send_file, jsonify, Response
import psycopg2, math, json, datetime, main, copy, requests, process, database
from config import config, sites_config
from main import unfoldCostLayers
database_api= Blueprint('database_api', __name__)
@database_api.route("/changeSite", methods=["POST"])
def changeSite():
if request.method == "POST":
site = request.json['site']
session['selected_site'] = site
return jsonify({'error': False, 'message': 'Site Changed!'})
@database_api.route("/getGroups")
def paginate_groups():
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
site_name = session['selected_site']
offset = (page - 1) * limit
groups = []
count = 0
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_groups LIMIT %s OFFSET %s;"
count = f"SELECT COUNT(*) FROM {site_name}_groups"
cur.execute(sql, (limit, offset))
groups = cur.fetchall()
cur.execute(count)
count = cur.fetchone()[0]
sql_item = f"SELECT {site_name}_items.barcode, {site_name}_items.item_name, {site_name}_logistics_info.quantity_on_hand FROM {site_name}_items LEFT JOIN {site_name}_logistics_info ON {site_name}_items.logistics_info_id = {site_name}_logistics_info.id WHERE {site_name}_items.id = %s; "
new_groups = []
for group in groups:
qty = 0
group = list(group)
items = []
print(group[3])
for item_id in group[3]:
cur.execute(sql_item, (item_id,))
item_row = list(cur.fetchone())
cur.execute(f"SELECT quantity_on_hand FROM {site_name}_item_locations WHERE part_id=%s;", (item_id, ))
item_locations = cur.fetchall()[0]
qty += float(sum(item_locations))
item_row[2] = sum(item_locations)
items.append(item_row)
group[3] = items
group.append(qty)
new_groups.append(group)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify({'groups': new_groups, "end": math.ceil(count/limit)})
@database_api.route("/getVendors")
def get_vendors():
database_config = config()
site_name = session['selected_site']
vendors = []
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_vendors;"
cur.execute(sql)
vendors = cur.fetchall()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify(vendors=vendors)
@database_api.route("/addGroup")
def addGroup():
name = str(request.args.get('name', ""))
description = str(request.args.get('description', ""))
group_type = str(request.args.get('type', ""))
site_name = session['selected_site']
state = "FAILED"
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"INSERT INTO {site_name}_groups (name, description, included_items, group_type) VALUES (%s, %s, %s, %s);"
cur.execute(sql, (name, description, json.dumps({}), group_type))
state = "SUCCESS"
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
conn.rollback()
return jsonify({'state': state})
@database_api.route("/getGroup")
def get_group():
id = int(request.args.get('id', 1))
database_config = config()
site_name = session['selected_site']
group = []
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_groups WHERE id=%s;"
cur.execute(sql, (id, ))
group = list(cur.fetchone())
sql_item = f"SELECT {site_name}_items.id, {site_name}_items.barcode, {site_name}_items.item_name, {site_name}_logistics_info.quantity_on_hand FROM {site_name}_items LEFT JOIN {site_name}_logistics_info ON {site_name}_items.logistics_info_id = {site_name}_logistics_info.id WHERE {site_name}_items.id = %s;"
qty = 0
group = list(group)
items = []
print(group[3])
for item_id in group[3]:
cur.execute(sql_item, (item_id,))
item_row = cur.fetchone()
qty += float(item_row[3])
items.append(item_row)
group[3] = items
group.append(qty)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify(group=group)
@database_api.route("/updateGroup", methods=["POST"])
def update_group():
if request.method == "POST":
site_name = session['selected_site']
group_id = request.get_json()['id']
items = request.get_json()['items']
name = request.get_json()['name']
description = request.get_json()['description']
group_type = request.get_json()['group_type']
data = (name, description, items, group_type, group_id)
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
# Start by updating the group -> included items with the up to date list
sql = f"UPDATE {site_name}_groups SET name = %s, description = %s, included_items = %s, group_type = %s WHERE id=%s;"
cur.execute(sql, data)
update_item_sql = f"UPDATE {site_name}_item_info SET groups = %s WHERE id = %s;"
select_item_sql = f"SELECT {site_name}_item_info.id, {site_name}_item_info.groups FROM {site_name}_items LEFT JOIN {site_name}_item_info ON {site_name}_items.item_info_id = {site_name}_item_info.id WHERE {site_name}_items.id = %s;"
# Now we will fetch each item row one by one and check if the group id is already inside of its groups array
for item_id in items:
cur.execute(select_item_sql, (item_id, ))
item = cur.fetchone()
print(item)
item_groups: set = set(item[1])
# Condition check, adds it if it doesnt exist.
if group_id not in item_groups:
item_groups.add(group_id)
cur.execute(update_item_sql, (list(item_groups), item[0]))
# Now we fetch all items that have the group id in its groups array
fetch_items_with_group = f"SELECT {site_name}_items.id, groups, {site_name}_item_info.id FROM {site_name}_item_info LEFT JOIN {site_name}_items ON {site_name}_items.item_info_id = {site_name}_item_info.id WHERE groups @> ARRAY[%s];"
cur.execute(fetch_items_with_group, (group_id, ))
group_items = cur.fetchall()
print(items)
# We will then check each item id against the groups new included_items list to see if the item should be in there
for item_id, group, info_id in group_items:
# If it is not we remove the group form the items list and update the item
if item_id not in items:
groups: list = list(group)
groups.remove(group_id)
cur.execute(update_item_sql, (list(groups), info_id))
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
conn.rollback()
return jsonify({"state": "SUCCESS"})
return jsonify({"state": "FAILED"})
@database_api.route("/addList")
def addList():
name = str(request.args.get('name', ""))
description = str(request.args.get('description', ""))
list_type = str(request.args.get('type', ""))
site_name = session['selected_site']
print(name, description, list_type)
state = "FAILED"
#if name or description or group_type == "":
# print("this is empty")
# return jsonify({'state': state})
timestamp = datetime.datetime.now()
data = (name, description, [], json.dumps({}), [], [], 0, timestamp, list_type)
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"INSERT INTO {site_name}_shopping_lists (name, description, pantry_items, custom_items, recipes, groups, author, creation_date, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);"
cur.execute(sql, data)
state = "SUCCESS"
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
conn.rollback()
return jsonify({'state': state})
@database_api.route("/getLists")
def paginate_lists():
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
site_name = session['selected_site']
offset = (page - 1) * limit
lists = []
count = 0
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_shopping_lists LIMIT %s OFFSET %s;"
count = f"SELECT COUNT(*) FROM {site_name}_shopping_lists;"
cur.execute(sql, (limit, offset))
temp_lists = list(cur.fetchall())
cur.execute(count)
count = cur.fetchone()[0]
for shopping_list in temp_lists:
shopping_list: list = list(shopping_list)
pantry_items = shopping_list[3]
custom_items = shopping_list[4]
list_length = len(custom_items)
sqlfile = open(f"sites/{site_name}/sql/unique/shopping_lists_safetystock_count.sql", "r+")
sql = "\n".join(sqlfile.readlines())
sqlfile.close()
print(sql)
if shopping_list[10] == 'calculated':
print(shopping_list[0])
cur.execute(sql, (shopping_list[0], ))
list_length += cur.fetchone()[0]
else:
list_length += len(pantry_items)
shopping_list.append(list_length)
lists.append(shopping_list)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify({'lists': lists, 'end': math.ceil(count/limit)})
@database_api.route("/getListView")
def get_list_view():
id = int(request.args.get('id', 1))
site_name = session['selected_site']
shopping_list = []
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_shopping_lists WHERE id=%s;"
cur.execute(sql, (id, ))
shopping_list = list(cur.fetchone())
if shopping_list[10] == "calculated":
sqlfile = open(f"sites/{site_name}/sql/unique/shopping_lists_safetystock.sql", "r+")
sql = "\n".join(sqlfile.readlines())
sqlfile.close()
else:
sqlfile = open(f"sites/{site_name}/sql/unique/shopping_lists_safetystock_uncalculated.sql", "r+")
sql = "\n".join(sqlfile.readlines())
sqlfile.close()
cur.execute(sql, (id, ))
shopping_list[3] = list(cur.fetchall())
print(shopping_list[4])
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify(shopping_list=shopping_list)
@database_api.route("/getList")
def get_list():
id = int(request.args.get('id', 1))
database_config = config()
site_name = session['selected_site']
shopping_list = []
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM {site_name}_shopping_lists WHERE id=%s;"
cur.execute(sql, (id, ))
shopping_list = list(cur.fetchone())
itemSQL = f"SELECT {site_name}_items.id, {site_name}_items.barcode, {site_name}_items.item_name, {site_name}_items.links, {site_name}_item_info.uom FROM {site_name}_items LEFT JOIN {site_name}_item_info ON {site_name}_items.item_info_id = {site_name}_item_info.id WHERE {site_name}_item_info.shopping_lists @> ARRAY[%s];"
cur.execute(itemSQL, (id, ))
shopping_list[3] = list(cur.fetchall())
print(shopping_list)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return jsonify(shopping_list=shopping_list)
@database_api.route("/updateList", methods=["POST"])
def update_list():
if request.method == "POST":
site_name = session['selected_site']
list_id = request.get_json()['id']
items = request.get_json()['items']
print(items)
custom_items = request.get_json()['custom']
name = request.get_json()['name']
description = request.get_json()['description']
list_type = request.get_json()['list_type']
quantities = request.get_json()['quantities']
data = (name, description, items, json.dumps(custom_items), list_type, json.dumps(quantities), list_id)
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
# Start by updating the group -> included items with the up to date list
sql = f"UPDATE {site_name}_shopping_lists SET name = %s, description = %s, pantry_items = %s, custom_items = %s, type = %s, quantities = %s WHERE id=%s;"
cur.execute(sql, data)
update_item_sql = f"UPDATE {site_name}_item_info SET shopping_lists = %s WHERE id = %s;"
select_item_sql = f"SELECT {site_name}_item_info.id, {site_name}_item_info.shopping_lists FROM {site_name}_items LEFT JOIN {site_name}_item_info ON {site_name}_items.item_info_id = {site_name}_item_info.id WHERE {site_name}_items.id = %s;"
# Now we will fetch each item row one by one and check if the group id is already inside of its groups array
for item_id in items:
cur.execute(select_item_sql, (item_id, ))
item = cur.fetchone()
print(item)
shopping_lists: set = set(item[1])
# Condition check, adds it if it doesnt exist.
if list_id not in shopping_lists:
shopping_lists.add(list_id)
cur.execute(update_item_sql, (list(shopping_lists), item[0]))
# Now we fetch all items that have the group id in its groups array
fetch_items_with_list = f"SELECT {site_name}_items.id, {site_name}_item_info.shopping_lists, {site_name}_item_info.id FROM {site_name}_item_info LEFT JOIN {site_name}_items ON {site_name}_items.item_info_id = {site_name}_item_info.id WHERE {site_name}_item_info.shopping_lists @> ARRAY[%s];"
cur.execute(fetch_items_with_list, (list_id, ))
list_items = cur.fetchall()
print(items)
# We will then check each item id against the groups new included_items list to see if the item should be in there
for item_id, shopping_list, info_id in list_items:
# If it is not we remove the group form the items list and update the item
if item_id not in items:
shopping_lists: list = list(shopping_list)
shopping_lists.remove(list_id)
cur.execute(update_item_sql, (list(shopping_lists), info_id))
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
conn.rollback()
return jsonify({"state": "SUCCESS"})
return jsonify({"state": "FAILED"})

View File

@ -1,263 +0,0 @@
from flask import Blueprint, request, render_template, redirect, session, url_for, send_file, jsonify, Response
import psycopg2, math, json, datetime, main, copy, requests
from config import config, sites_config
from main import unfoldCostLayers, get_sites, get_roles, create_site_secondary, getUser
from manage import create
from user_api import login_required
import postsqldb, process, hashlib, database_admin
admin_api = Blueprint('admin_api', __name__)
@admin_api.route('/admin')
def admin_index():
sites = [site[1] for site in main.get_sites(session['user']['sites'])]
return render_template("admin/index.html",
current_site=session['selected_site'],
sites=sites)
@admin_api.route('/admin/site/<id>')
@login_required
def adminSites(id):
if id == "new":
new_site = postsqldb.SitesTable.Payload(
"",
"",
session['user_id']
)
return render_template("admin/site.html", site=new_site.get_dictionary())
else:
database_config = config()
with psycopg2.connect(**database_config) as conn:
site = postsqldb.SitesTable.select_tuple(conn, (id,))
return render_template('admin/site.html', site=site)
@admin_api.route('/admin/role/<id>')
@login_required
def adminRoles(id):
database_config = config()
with psycopg2.connect(**database_config) as conn:
sites = postsqldb.SitesTable.selectTuples(conn)
if id == "new":
new_role = postsqldb.RolesTable.Payload(
"",
"",
0
)
return render_template("admin/role.html", role=new_role.get_dictionary(), sites=sites)
else:
role = postsqldb.RolesTable.select_tuple(conn, (id,))
return render_template('admin/role.html', role=role, sites=sites)
@admin_api.route('/admin/user/<id>')
@login_required
def adminUser(id):
database_config = config()
with psycopg2.connect(**database_config) as conn:
if id == "new":
new_user = postsqldb.LoginsTable.Payload("", "", "", "")
return render_template("admin/user.html", user=new_user.get_dictionary())
else:
user = database_admin.selectLoginsUser(int(id))
return render_template('admin/user.html', user=user)
@admin_api.route('/admin/getSites', methods=['GET'])
@login_required
def getSites():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
database_config = config()
with psycopg2.connect(**database_config) as conn:
records, count = postsqldb.SitesTable.paginateTuples(conn, (limit, offset))
return jsonify({'sites': records, "end": math.ceil(count/limit), 'error':False, 'message': 'Sites Loaded Successfully!'})
return jsonify({'sites': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading Sites!'})
@admin_api.route('/admin/getRoles', methods=['GET'])
@login_required
def getRoles():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
database_config = config()
with psycopg2.connect(**database_config) as conn:
records, count = postsqldb.RolesTable.paginate_tuples(conn, (limit, offset))
return jsonify({'roles': records, "end": math.ceil(count/limit), 'error':False, 'message': 'Roles Loaded Successfully!'})
return jsonify({'roles': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading Roles!'})
@admin_api.route('/admin/getLogins', methods=['GET'])
@login_required
def getLogins():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
database_config = config()
with psycopg2.connect(**database_config) as conn:
records, count = postsqldb.LoginsTable.paginate_tuples(conn, (limit, offset))
return jsonify({'logins': records, "end": math.ceil(count/limit), 'error':False, 'message': 'logins Loaded Successfully!'})
return jsonify({'logins': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading logins!'})
@admin_api.route('/admin/site/postDeleteSite', methods=["POST"])
def postDeleteSite():
if request.method == "POST":
site_id = request.get_json()['site_id']
database_config = config()
user_id = session['user_id']
try:
with psycopg2.connect(**database_config) as conn:
user = postsqldb.LoginsTable.select_tuple(conn, (user_id,))
admin_user = (user['username'], user['password'], user['email'], user['row_type'])
site = postsqldb.SitesTable.select_tuple(conn, (site_id,))
site = postsqldb.SitesTable.Manager(
site['site_name'],
admin_user,
site['default_zone'],
site['default_primary_location'],
site['site_description']
)
process.deleteSite(site_manager=site)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f""})
return jsonify({'error': True, 'message': f""})
@admin_api.route('/admin/site/postAddSite', methods=["POST"])
def postAddSite():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
site_name = session['selected_site']
user_id = session['user_id']
print(payload)
try:
with psycopg2.connect(**database_config) as conn:
user = postsqldb.LoginsTable.select_tuple(conn, (user_id,))
admin_user = (user['username'], user['password'], user['email'], user['row_type'])
site = postsqldb.SitesTable.Manager(
payload['site_name'],
admin_user,
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
process.addSite(site_manager=site)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"Zone added to {site_name}."})
return jsonify({'error': True, 'message': f"These was an error with adding this Zone to {site_name}."})
@admin_api.route('/admin/site/postEditSite', methods=["POST"])
def postEditSite():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
try:
with psycopg2.connect(**database_config) as conn:
postsqldb.SitesTable.update_tuple(conn, payload)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"Site updated."})
return jsonify({'error': True, 'message': f"These was an error with updating Site."})
@admin_api.route('/admin/role/postAddRole', methods=["POST"])
def postAddRole():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
print(payload)
try:
with psycopg2.connect(**database_config) as conn:
role = postsqldb.RolesTable.Payload(
payload['role_name'],
payload['role_description'],
payload['site_id']
)
postsqldb.RolesTable.insert_tuple(conn, role.payload())
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"Role added."})
return jsonify({'error': True, 'message': f"These was an error with adding this Role."})
@admin_api.route('/admin/role/postEditRole', methods=["POST"])
def postEditRole():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
print(payload)
try:
with psycopg2.connect(**database_config) as conn:
postsqldb.RolesTable.update_tuple(conn, payload)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"Role updated."})
return jsonify({'error': True, 'message': f"These was an error with updating this Role."})
@admin_api.route('/admin/user/postAddLogin', methods=["POST"])
def postAddLogin():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
user = []
try:
with psycopg2.connect(**database_config) as conn:
user = postsqldb.LoginsTable.Payload(
payload['username'],
hashlib.sha256(payload['password'].encode()).hexdigest(),
payload['email'],
payload['row_type']
)
user = postsqldb.LoginsTable.insert_tuple(conn, user.payload())
except postsqldb.DatabaseError as error:
conn.rollback()
return jsonify({'user': user, 'error': True, 'message': error})
return jsonify({'user': user, 'error': False, 'message': f"User added."})
return jsonify({'user': user, 'error': True, 'message': f"These was an error with adding this User."})
@admin_api.route('/admin/user/postEditLogin', methods=["POST"])
def postEditLogin():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
try:
with psycopg2.connect(**database_config) as conn:
postsqldb.LoginsTable.update_tuple(conn, payload)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"User was Added Successfully."})
return jsonify({'error': True, 'message': f"These was an error with adding this user."})
@admin_api.route('/admin/user/postEditLoginPassword', methods=["POST"])
def postEditLoginPassword():
if request.method == "POST":
payload = request.get_json()['payload']
database_config = config()
try:
with psycopg2.connect(**database_config) as conn:
user = postsqldb.LoginsTable.select_tuple(conn, (payload['id'],))
if hashlib.sha256(payload['current_password'].encode()).hexdigest() != user['password']:
return jsonify({'error': True, 'message': "The provided current password is incorrect"})
payload['update']['password'] = hashlib.sha256(payload['update']['password'].encode()).hexdigest()
postsqldb.LoginsTable.update_tuple(conn, payload)
except Exception as error:
conn.rollback()
return jsonify({'error': True, 'message': error})
return jsonify({'error': False, 'message': f"Password was changed successfully."})
return jsonify({'error': True, 'message': f"These was an error with updating this Users password."})

Binary file not shown.

Binary file not shown.

View File

View File

@ -1,62 +1,81 @@
from flask import Blueprint, request, render_template, redirect, session, url_for, jsonify
import hashlib, psycopg2, process, MyDataclasses
from config import config, sites_config, setFirstSetupDone
from authlib.integrations.flask_client import OAuth
import hashlib, psycopg2
from config import config, sites_config
from functools import wraps
from manage import create
from main import create_site, getUser, setSystemAdmin
import postsqldb
import requests
from application.access_module import access_database
from outh import oauth
access_api = Blueprint('access_api', __name__, template_folder="templates", static_folder="static")
login_app = Blueprint('login', __name__)
def update_session_user():
database_config = config()
with psycopg2.connect(**database_config) as conn:
user = postsqldb.LoginsTable.get_washed_tuple(conn, (session['user_id'],))
session['user'] = user
user = access_database.selectLoginsTupleByID((session['user_id'],))
user = access_database.washUserDictionary(user)
session['user'] = user
print(user)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
if 'user' not in session or session['user'] == None:
return redirect(url_for('login.login'))
return redirect(url_for('access_api.login'))
return func(*args, **kwargs)
return wrapper
@login_app.route('/setup', methods=['GET', 'POST'])
def first_time_setup():
if request.method == "POST":
database_address = request.form['database_address']
database_port = request.form['database_port']
database_name = request.form['database_name']
database_user = request.form['database_user']
database_password = request.form['database_address']
site_manager = MyDataclasses.SiteManager(
site_name=request.form['site_name'],
admin_user=(request.form['username'], hashlib.sha256(request.form['password'].encode()).hexdigest(), request.form['email']),
default_zone=request.form['site_default_zone'],
default_location=request.form['site_default_location'],
description=request.form['site_description']
)
process.addSite(site_manager)
setFirstSetupDone()
return redirect("/login")
return render_template("setup.html")
@login_app.route('/logout', methods=['GET'])
@access_api.route('/logout', methods=['GET'])
@login_required
def logout():
if 'user' in session.keys():
session['user'] = None
return redirect('/login')
return redirect('/access/login')
@login_app.route('/login', methods=['POST', 'GET'])
@access_api.route('/auth')
def auth():
token = oauth.authentik.authorize_access_token()
access_token = token['access_token']
userinfo_endpoint="https://auth.treehousefullofstars.com/application/o/userinfo/"
headers = {
'Authorization': f'Bearer {access_token}',
}
response = requests.get(userinfo_endpoint, headers=headers)
if response.status_code != 200:
print("Failed to fetch user info:", response.status_code, response.text)
return redirect('/access/login')
external_user = response.json()
user = access_database.selectUserByEmail((external_user['email'],))
if user['login_type'] == "External":
payload = {
'id': user['id'],
'update': {
'username': external_user['preferred_username'],
'profile_pic_url': external_user['picture']
}
}
user = access_database.updateLoginsTuple(payload)
user = access_database.washUserDictionary(user)
session['user_id'] = user['id']
session['user'] = user
return redirect('/')
return redirect('/access/login')
@access_api.route('/login/oidc')
def oidc_login():
redirect_uri = url_for('access_api.auth', _external=True)
return oauth.authentik.authorize_redirect(redirect_uri)
@access_api.route('/login', methods=['POST', 'GET'])
def login():
session.clear()
instance_config = sites_config()
@ -83,6 +102,7 @@ def login():
if user and user[2] == password:
session['user_id'] = user[0]
session['user'] = {'id': user[0], 'username': user[1], 'sites': user[13], 'site_roles': user[14], 'system_admin': user[15], 'flags': user[16]}
session['login_type'] = 'Internal'
return jsonify({'error': False, 'message': 'Logged In Sucessfully!'})
else:
return jsonify({'error': True, 'message': 'Username or Password was incorrect!'})
@ -91,9 +111,15 @@ def login():
if 'user' not in session.keys():
session['user'] = None
return render_template("other/login.html")
return render_template("login.html")
@login_app.route('/signup', methods=['POST', 'GET'])
@access_api.route('/dashboard')
def dashboard():
if 'user' not in session:
return redirect('/')
return f"Hello, {session['user']['name']}! <a href='/logout'>Logout</a>"
@access_api.route('/signup', methods=['POST', 'GET'])
def signup():
instance_config = sites_config()
if not instance_config['signup_enabled']:

View File

@ -0,0 +1,103 @@
import psycopg2
import config
from application import postsqldb
def washUserDictionary(user):
return {
'id': user['id'],
'username': user['username'],
'sites': user['sites'],
'site_roles': user['site_roles'],
'system_admin': user['system_admin'],
'flags': user['flags'],
'profile_pic_url': user['profile_pic_url'],
'login_type': user['login_type']
}
def selectLoginsTupleByID(payload, convert=True, conn=None):
""" payload = (id,)"""
self_conn = False
user = ()
sql = f"SELECT * FROM logins WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.commit()
conn.close()
return user
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectUserByEmail(payload, convert=True, conn=None):
""" payload = (email,)"""
self_conn = False
user = ()
sql = f"SELECT * FROM logins WHERE email=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.commit()
conn.close()
return user
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateLoginsTuple(payload, convert=True, conn=None):
""" payload = {'id': user_id, 'update': {...}}"""
self_conn = False
user = ()
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE logins SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.commit()
conn.close()
return user
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)

View File

@ -8,7 +8,7 @@ async function loginUser() {
let username = document.getElementById('login_username').value
let password = document.getElementById('login_password').value
const response = await fetch(`/login`, {
const response = await fetch(`/access/login`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@ -97,7 +97,7 @@ async function signupUser() {
let user_email = document.getElementById('signup_email').value
let password = document.getElementById('signup_password').value
let username = document.getElementById('signup_username').value
const response = await fetch(`/signup`, {
const response = await fetch(`/access/signup`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View File

@ -32,7 +32,6 @@
<li><a href="#">Login</a></li>
<li><a href="#">Sign Up</a></li>
</ul>
<div class="uk-switcher">
<div class="uk-grid-small" uk-grid>
<div class="uk-width-1-1">
@ -52,9 +51,14 @@
</div>
</div>
<div class="uk-width-1-1">
<button onclick="loginUser()" class="uk-button uk-button-default uk-align-right">Login</button>
<a onclick="loginUser()" class="uk-button uk-button-primary uk-align-center">Login</a>
</div>
<p class="uk-text-meta uk-align-center uk-margin-remove-bottom">OR</p>
<div class="uk-width-1-1">
<a href="/access/login/oidc" class="uk-button uk-button-primary uk-align-center">Login with Authentik</a>
</div>
</div>
<!-- Sign up Form -->
<div class="uk-grid-small" uk-grid>
<div class="uk-width-1-1">
<div class="uk-margin">
@ -98,5 +102,5 @@
</div>
</div>
</body>
<script src="{{ url_for('static', filename='handlers/loginHandler.js') }}"></script>
<script src="{{ url_for('access_api.static', filename='js/loginHandler.js') }}"></script>
</html>

View File

View File

@ -0,0 +1,222 @@
# 3RD PARTY IMPORTS
from flask import (
Blueprint, request, render_template, session, jsonify, redirect
)
import math
import hashlib
# APPLICATION IMPORTS
from application.access_module import access_api
from application.administration import administration_database, administration_processes
from application import database_payloads, postsqldb
admin_api = Blueprint('admin_api', __name__, template_folder="templates", static_folder="static")
# ROOT TEMPLATE ROUTES
@admin_api.route('/')
@access_api.login_required
def admin_index():
sites = [site[1] for site in postsqldb.get_sites(session['user']['sites'])]
return render_template("admin_index.html", current_site=session['selected_site'], sites=sites)
@admin_api.route('/site/<id>')
@access_api.login_required
def adminSites(id):
if id == "new":
new_site_payload = database_payloads.SitePayload("", "", session['user_id'])
return render_template("site.html", site=new_site_payload.get_dictionary())
else:
site = administration_database.selectSitesTuple((id,))
return render_template('site.html', site=site)
@admin_api.route('/role/<id>')
@access_api.login_required
def adminRoles(id):
sites = administration_database.selectSitesTuples()
if id == "new":
new_role_payload = database_payloads.RolePayload("", "", 0)
return render_template("role.html", role=new_role_payload.get_dictionary(), sites=sites)
else:
role = administration_database.selectRolesTuple((id,))
return render_template('role.html', role=role, sites=sites)
@admin_api.route('/user/<id>')
@access_api.login_required
def adminUser(id):
if id == "new":
new_user_payload = database_payloads.LoginsPayload("", "", "", "")
return render_template("user.html", user=new_user_payload.get_dictionary())
else:
user = administration_database.selectLoginsTuple((int(id),))
return render_template('user.html', user=user)
@admin_api.route('/setup', methods=['GET', 'POST'])
def first_time_setup():
if request.method == "POST":
database_address = request.form['database_address']
database_port = request.form['database_port']
database_name = request.form['database_name']
database_user = request.form['database_user']
database_password = request.form['database_address']
payload = {
"site_name" : request.form['site_name'],
"admin_user": (request.form['username'], hashlib.sha256(request.form['password'].encode()).hexdigest(), request.form['email']),
"default_zone": request.form['site_default_zone'],
"default_primary_location": request.form['site_default_location'],
"site_description": request.form['site_description']
}
administration_processes.addSite(payload)
return redirect("/login")
return render_template("setup.html")
# API ROUTES
@admin_api.route('/api/getSites', methods=['GET'])
@access_api.login_required
def getSites():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
records, count = administration_database.paginateSitesTuples((limit, offset))
return jsonify({'sites': records, "end": math.ceil(count/limit), 'error':False, 'message': 'Sites Loaded Successfully!'})
return jsonify({'sites': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading Sites!'})
@admin_api.route('/api/getRoles', methods=['GET'])
@access_api.login_required
def getRoles():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
records, count = administration_database.paginateRolesTuples((limit, offset))
return jsonify({'roles': records, "end": math.ceil(count/limit), 'error':False, 'message': 'Roles Loaded Successfully!'})
return jsonify({'roles': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading Roles!'})
@admin_api.route('/api/getLogins', methods=['GET'])
@access_api.login_required
def getLogins():
if request.method == "GET":
records = []
count = 0
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
offset = (page - 1) * limit
records, count = administration_database.paginateLoginsTuples((limit, offset))
return jsonify({'logins': records, "end": math.ceil(count/limit), 'error':False, 'message': 'logins Loaded Successfully!'})
return jsonify({'logins': records, "end": math.ceil(count/limit), 'error':True, 'message': 'There was a problem loading logins!'})
@admin_api.route('/api/site/postDeleteSite', methods=["POST"])
@access_api.login_required
def postDeleteSite():
if request.method == "POST":
site_id = request.get_json()['site_id']
user_id = session['user_id']
site = administration_database.selectSitesTuple((site_id,))
user = administration_database.selectLoginsTuple((user_id,))
if user['id'] != site['site_owner_id']:
return jsonify({'error': True, 'message': f"You must be the owner of this site to delete."})
try:
administration_processes.deleteSite(site, user)
except Exception as err:
print(err)
return jsonify({'error': False, 'message': f""})
return jsonify({'error': True, 'message': f""})
@admin_api.route('/api/site/postAddSite', methods=["POST"])
@access_api.login_required
def postAddSite():
if request.method == "POST":
payload = request.get_json()['payload']
site_name = session['selected_site']
user_id = session['user_id']
user = administration_database.selectLoginsTuple((user_id,))
payload['admin_user'] = (user['username'], user['password'], user['email'], user['row_type'])
administration_processes.addSite(payload)
return jsonify({'error': False, 'message': f"Zone added to {site_name}."})
return jsonify({'error': True, 'message': f"These was an error with adding this Zone to {site_name}."})
@admin_api.route('/api/site/postEditSite', methods=["POST"])
@access_api.login_required
def postEditSite():
if request.method == "POST":
payload = request.get_json()['payload']
administration_database.updateSitesTuple(payload)
return jsonify({'error': False, 'message': f"Site updated."})
return jsonify({'error': True, 'message': f"These was an error with updating Site."})
@admin_api.route('/api/role/postAddRole', methods=["POST"])
@access_api.login_required
def postAddRole():
if request.method == "POST":
payload = request.get_json()['payload']
print(payload)
role = database_payloads.RolePayload(
payload['role_name'],
payload['role_description'],
payload['site_id']
)
administration_database.insertRolesTuple(role.payload())
return jsonify({'error': False, 'message': f"Role added."})
return jsonify({'error': True, 'message': f"These was an error with adding this Role."})
@admin_api.route('/api/role/postEditRole', methods=["POST"])
@access_api.login_required
def postEditRole():
if request.method == "POST":
payload = request.get_json()['payload']
administration_database.updateRolesTuple(payload)
return jsonify({'error': False, 'message': f"Role updated."})
return jsonify({'error': True, 'message': f"These was an error with updating this Role."})
@admin_api.route('/api/user/postAddLogin', methods=["POST"])
@access_api.login_required
def postAddLogin():
if request.method == "POST":
payload = request.get_json()['payload']
user = database_payloads.LoginsPayload(
payload['username'],
hashlib.sha256(payload['password'].encode()).hexdigest(),
payload['email'],
payload['row_type']
)
user = administration_database.insertLoginsTuple(user.payload())
return jsonify({'user': user, 'error': False, 'message': f"User added."})
return jsonify({'user': user, 'error': True, 'message': f"These was an error with adding this User."})
@admin_api.route('/api/user/postEditLogin', methods=["POST"])
@access_api.login_required
def postEditLogin():
if request.method == "POST":
payload = request.get_json()['payload']
administration_database.updateLoginsTuple(payload)
return jsonify({'error': False, 'message': f"User was Added Successfully."})
return jsonify({'error': True, 'message': f"These was an error with adding this user."})
@admin_api.route('/api/user/postEditLoginPassword', methods=["POST"])
@access_api.login_required
def postEditLoginPassword():
if request.method == "POST":
payload = request.get_json()['payload']
user = administration_database.selectLoginsTuple((payload['id'],))
if hashlib.sha256(payload['current_password'].encode()).hexdigest() != user['password']:
return jsonify({'error': True, 'message': "The provided current password is incorrect"})
payload['update']['password'] = hashlib.sha256(payload['update']['password'].encode()).hexdigest()
administration_database.updateLoginsTuple(payload)
return jsonify({'error': False, 'message': f"Password was changed successfully."})
return jsonify({'error': True, 'message': f"These was an error with updating this Users password."})

View File

@ -0,0 +1,767 @@
# 3RD PARTY IMPORTS
import psycopg2
# APPLICATION IMPORTS
from application import postsqldb
import config
def getUser(conn, payload, convert=False):
"""_summary_
Args:
conn (_type_): _description_
payload (tuple): (username, password)
convert (bool, optional): _description_. Defaults to False.
Raises:
DatabaseError: _description_
Returns:
_type_: _description_
"""
user = ()
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM logins WHERE username=%s;"
cur.execute(sql, (payload[0],))
rows = cur.fetchone()
if rows and rows[2] == payload[1] and convert:
user = tupleDictionaryFactory(cur.description, rows)
elif rows and rows[2] == payload[1] and not convert:
user = rows
except Exception as error:
raise DatabaseError(error, payload, sql)
return user
def selectLoginsTuple(payload, convert=True, conn=None):
user = ()
self_conn = False
with open("application/administration/sql/selectLoginsUser.sql", "r") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.close()
return user
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectSitesTuple(payload, convert=True, conn=None):
record = []
self_conn = False
sql = f"SELECT * FROM sites WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.close()
return record
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectSiteTupleByName(payload, convert=True, conn=None):
""" payload (tuple): (site_name,) """
site = ()
self_conn = False
select_site_sql = f"SELECT * FROM sites WHERE site_name = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_site_sql, payload)
rows = cur.fetchone()
if rows and convert:
site = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
site = rows
if self_conn:
conn.commit()
conn.close()
return site
except Exception as error:
raise postsqldb.DatabaseError(error, payload, select_site_sql)
def selectSitesTuples(convert=True, conn=None):
sites = []
self_conn = False
sql = f"SELECT * FROM sites;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if rows and convert:
sites = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
sites = rows
if self_conn:
conn.close()
return sites
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectRolesTuple(payload, convert=True, conn=None):
role = []
self_conn = False
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id WHERE roles.id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
role = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
role = rows
if self_conn:
conn.close()
return role
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectRolesTupleBySite(payload, convert=True, conn=None):
""" payload (tuple): (site_id,) """
roles = ()
self_conn = False
select_roles_sql = f"SELECT * FROM roles WHERE site_id = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_roles_sql, payload)
rows = cur.fetchall()
if rows and convert:
roles = [postsqldb.tupleDictionaryFactory(cur.description, role) for role in rows]
elif rows and not convert:
roles = rows
if self_conn:
conn.close()
return roles
except Exception as error:
raise postsqldb.DatabaseError(error, payload, select_roles_sql)
def paginateSitesTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordsets = []
count = 0
self_conn = False
sql = f"SELECT * FROM sites LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM sites;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordsets = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordsets = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordsets, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateRolesTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = []
self_conn = False
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM roles;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def paginateLoginsTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = []
self_conn = False
sql = f"SELECT * FROM logins LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM logins;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertSitesTuple(payload, convert=True, conn=None):
""" payload (tuple): (site_name[str], site_description[str], creation_date[timestamp], site_owner_id[int],
flags[dict], default_zone[str], default_auto_issue_location[str], default_primary_location[str]) """
site_tuple = ()
self_conn = False
with open(f"application/administration/sql/insertSitesTuple.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
site_tuple = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
site_tuple = rows
if self_conn:
conn.commit()
conn.close()
return site_tuple
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertRolesTuple(payload, convert=True, conn=None):
""" payload (tuple): (role_name[str], role_description[str], site_id[int], flags[jsonb]) """
role_tuple = ()
self_conn = False
with open(f"application/administration/sql/insertRolesTuple.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
role_tuple = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
role_tuple = rows
if self_conn:
conn.commit()
conn.close()
return role_tuple
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertLoginsTuple(payload, convert=True, conn=None):
"""payload (tuple): (username, password, email, favorites, unseen_pantry_items, unseen_groups, unseen_shopping_lists,
unseen_recipes, seen_pantry_items, seen_groups, seen_shopping_lists, seen_recipes,
sites, site_roles, system_admin, flags, row_type)"""
login = ()
self_conn = False
with open(f"application/administration/sql/insertLoginsTupleFull.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
login = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
login = rows
if self_conn:
conn.commit()
conn.close()
return login
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertZonesTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
zone = ()
self_conn = False
with open(f"application/administration/sql/insertZonesTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
zone = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
zone = rows
if self_conn:
conn.commit()
conn.close()
return zone
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertLocationsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (uuid[str], name[str], zone_id[int], items[jsonb]) """
location = ()
self_conn = False
with open(f"application/administration/sql/insertLocationsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
location = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
location = rows
if self_conn:
conn.commit()
conn.close()
return location
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertVendorsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (vendor_name[str], vendor_address[str], creation_date[timestamp], created_by[int], phone_number[str]) """
vendor = ()
self_conn = False
with open(f"application/administration/sql/insertVendorsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
vendor = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
vendor = rows
if self_conn:
conn.commit()
conn.close()
return vendor
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertBrandsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (brand_name[str], ) """
brand = ()
self_conn = False
with open(f"application/administration/sql/insertBrandsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
brand = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
brand = rows
if self_conn:
conn.commit()
conn.close()
return brand
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateAddLoginSitesRoles(payload, convert=True, conn=None):
""" payload (tuple): (site_id, role_id, login_id) """
sql = f"UPDATE logins SET sites = sites || %s, site_roles = site_roles || %s WHERE id=%s RETURNING *;"
login = ()
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
login = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
login = rows
if self_conn:
conn.commit()
conn.close()
return login
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateSitesTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE sites SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateUsersSites(payload, convert=True, conn=None):
""" payload: {'site_id',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT logins.id FROM logins WHERE sites @> ARRAY[%s];"
with conn.cursor() as cur:
cur.execute(select_sql, (payload['site_id'], ))
user = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE logins SET sites = array_remove(sites, %s) WHERE id = %s;"
with conn.cursor() as cur:
for user_id in user:
cur.execute(update_sql, (payload['site_id'], user_id))
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
def updateUsersRoles(payload, convert=True, conn=None):
""" payload: {'role_id',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT logins.id FROM logins WHERE site_roles @> ARRAY[%s];"
with conn.cursor() as cur:
cur.execute(select_sql, (payload['role_id'], ))
users = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE logins SET site_roles = array_remove(site_roles, %s) WHERE id = %s;"
with conn.cursor() as cur:
for user_id in users:
cur.execute(update_sql, (payload['role_id'], user_id))
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
def updateRolesTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE roles SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateLoginsTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE logins SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def createTable(site, table, conn=None):
self_conn = False
with open(f"application/administration/sql/CREATE/{table}.sql", 'r') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise postsqldb.DatabaseError(error, sql, table)
def dropTable(site, table, conn=None):
self_conn = False
with open(f"application/administration/sql/DROP/{table}.sql", 'r') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise postsqldb.DatabaseError(error, sql, table)
def deleteSitesTuple(payload, convert=True, conn=None):
"""payload (tuple): (tuple_id, )"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM sites WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def deleteRolesTuple(payload, convert=True, conn=None):
"""payload (tuple): (tuple_id, )"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM roles WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)

View File

@ -0,0 +1,167 @@
# 3RD PARTY IMPORTS
import psycopg2
import datetime
# APPLICATION IMPORTS
import config
from application import postsqldb, database_payloads
from application.administration import administration_database
def dropSiteTables(conn, site_manager):
try:
for table in site_manager.drop_order:
administration_database.dropTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} DROPPED!\n")
except Exception as error:
raise error
def deleteSite(site, user, conn=None):
"""Uses a Site Manager to delete a site from the system.
Args:
site_manager (MyDataclasses.SiteManager):
Raises:
Exception:
"""
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
try:
admin_user = (user['username'], user['password'], user['email'], user['row_type'])
site_manager = database_payloads.SiteManager(
site['site_name'],
admin_user,
site['default_zone'],
site['default_primary_location'],
site['site_description']
)
roles = administration_database.selectRolesTupleBySite((site['id'],), conn=conn)
administration_database.deleteRolesTuple([role['id'] for role in roles], conn=conn)
dropSiteTables(conn, site_manager)
for role in roles:
administration_database.updateUsersRoles({'role_id': role['id']}, conn=conn)
administration_database.updateUsersSites({'site_id': site['id']}, conn=conn)
site = administration_database.deleteSitesTuple((site['id'], ), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
conn.close()
def addAdminUser(conn, site_manager, convert=True):
admin_user = ()
try:
sql = f"INSERT INTO logins (username, password, email, row_type) VALUES (%s, %s, %s, %s) ON CONFLICT (username) DO UPDATE SET username = excluded.username RETURNING *;"
with conn.cursor() as cur:
cur.execute(sql, site_manager.admin_user)
rows = cur.fetchone()
if rows and convert:
admin_user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
admin_user = rows
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- Admin User Created!\n")
except Exception as error:
raise error
return admin_user
def setupSiteTables(conn, site_manager):
try:
for table in site_manager.create_order:
administration_database.createTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} Created!\n")
except Exception as error:
raise error
def addSite(payload, conn=None):
"""uses a Site Manager to add a site to the system
Args:
site_manager (MyDataclasses.SiteManager):
"""
self_conn = False
site_manager = database_payloads.SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
setupSiteTables(conn, site_manager)
admin_user = addAdminUser(conn, site_manager)
site = database_payloads.SitePayload(
site_name=site_manager.site_name,
site_description=site_manager.description,
site_owner_id=admin_user['id']
)
site = administration_database.insertSitesTuple(site.payload(), conn=conn)
role = database_payloads.RolePayload("Admin", f"Admin for {site['site_name']}", site['id'])
role = administration_database.insertRolesTuple(role.payload(), conn=conn)
admin_user = administration_database.updateAddLoginSitesRoles((site["id"], role["id"], admin_user["id"]), conn=conn)
default_zone = database_payloads.ZonesPayload(site_manager.default_zone)
default_zone = administration_database.insertZonesTuple(site["site_name"], default_zone.payload(), conn=conn)
uuid = f"{site_manager.default_zone}@{site_manager.default_location}"
default_location = database_payloads.LocationsPayload(uuid, site_manager.default_location, default_zone['id'])
default_location = administration_database.insertLocationsTuple(site['site_name'], default_location.payload(), conn=conn)
payload = {
'id': site['id'],
'update': {
'default_zone': default_zone['id'],
'default_auto_issue_location': default_location['id'],
'default_primary_location': default_location['id']
}
}
administration_database.updateSitesTuple(payload, conn=conn)
blank_vendor = database_payloads.VendorsPayload("None", admin_user['id'])
blank_brand = database_payloads.BrandsPayload("None")
blank_vendor = administration_database.insertVendorsTuple(site['site_name'], blank_vendor.payload(), conn=conn)
blank_brand = administration_database.insertBrandsTuple(site['site_name'], blank_brand.payload(), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
raise error

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_brands
(name)
VALUES (%s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_locations
(uuid, name, zone_id)
VALUES (%s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,6 @@
INSERT INTO logins
(username, password, email, favorites, unseen_pantry_items, unseen_groups, unseen_shopping_lists,
unseen_recipes, seen_pantry_items, seen_groups, seen_shopping_lists, seen_recipes,
sites, site_roles, system_admin, flags, row_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO roles
(role_name, role_description, site_id, flags)
VALUES (%s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,5 @@
INSERT INTO sites
(site_name, site_description, creation_date, site_owner_id, flags, default_zone,
default_auto_issue_location, default_primary_location)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_vendors
(vendor_name, vendor_address, creation_date, created_by, phone_number)
VALUES (%s, %s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_zones
(name, description)
VALUES (%s, %s)
RETURNING *;

Some files were not shown because too many files have changed in this diff Show More