pantry-track/application/administration/administration_database.py
2025-08-02 18:59:14 -05:00

764 lines
25 KiB
Python

from application import postsqldb
import psycopg2
import config
def getUser(conn, payload, convert=False):
"""_summary_
Args:
conn (_type_): _description_
payload (tuple): (username, password)
convert (bool, optional): _description_. Defaults to False.
Raises:
DatabaseError: _description_
Returns:
_type_: _description_
"""
user = ()
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM logins WHERE username=%s;"
cur.execute(sql, (payload[0],))
rows = cur.fetchone()
if rows and rows[2] == payload[1] and convert:
user = tupleDictionaryFactory(cur.description, rows)
elif rows and rows[2] == payload[1] and not convert:
user = rows
except Exception as error:
raise DatabaseError(error, payload, sql)
return user
def selectLoginsTuple(payload, convert=True, conn=None):
user = ()
self_conn = False
with open("application/administration/sql/selectLoginsUser.sql", "r") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.close()
return user
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectSitesTuple(payload, convert=True, conn=None):
record = []
self_conn = False
sql = f"SELECT * FROM sites WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.close()
return record
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectSiteTupleByName(payload, convert=True, conn=None):
""" payload (tuple): (site_name,) """
site = ()
self_conn = False
select_site_sql = f"SELECT * FROM sites WHERE site_name = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_site_sql, payload)
rows = cur.fetchone()
if rows and convert:
site = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
site = rows
if self_conn:
conn.commit()
conn.close()
return site
except Exception as error:
raise postsqldb.DatabaseError(error, payload, select_site_sql)
def selectSitesTuples(convert=True, conn=None):
sites = []
self_conn = False
sql = f"SELECT * FROM sites;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if rows and convert:
sites = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
sites = rows
if self_conn:
conn.close()
return sites
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectRolesTuple(payload, convert=True, conn=None):
role = []
self_conn = False
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id WHERE roles.id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
role = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
role = rows
if self_conn:
conn.close()
return role
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def selectRolesTupleBySite(payload, convert=True, conn=None):
""" payload (tuple): (site_id,) """
roles = ()
self_conn = False
select_roles_sql = f"SELECT * FROM roles WHERE site_id = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_roles_sql, payload)
rows = cur.fetchall()
if rows and convert:
roles = [postsqldb.tupleDictionaryFactory(cur.description, role) for role in rows]
elif rows and not convert:
roles = rows
if self_conn:
conn.close()
return roles
except Exception as error:
raise postsqldb.DatabaseError(error, payload, select_roles_sql)
def paginateSitesTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordsets = []
count = 0
self_conn = False
sql = f"SELECT * FROM sites LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM sites;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordsets = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordsets = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordsets, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateRolesTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = []
self_conn = False
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM roles;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def paginateLoginsTuples(payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = []
self_conn = False
sql = f"SELECT * FROM logins LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM logins;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(sql_count)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertSitesTuple(payload, convert=True, conn=None):
""" payload (tuple): (site_name[str], site_description[str], creation_date[timestamp], site_owner_id[int],
flags[dict], default_zone[str], default_auto_issue_location[str], default_primary_location[str]) """
site_tuple = ()
self_conn = False
with open(f"application/administration/sql/insertSitesTuple.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
site_tuple = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
site_tuple = rows
if self_conn:
conn.commit()
conn.close()
return site_tuple
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertRolesTuple(payload, convert=True, conn=None):
""" payload (tuple): (role_name[str], role_description[str], site_id[int], flags[jsonb]) """
role_tuple = ()
self_conn = False
with open(f"application/administration/sql/insertRolesTuple.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
role_tuple = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
role_tuple = rows
if self_conn:
conn.commit()
conn.close()
return role_tuple
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertLoginsTuple(payload, convert=True, conn=None):
"""payload (tuple): (username, password, email, favorites, unseen_pantry_items, unseen_groups, unseen_shopping_lists,
unseen_recipes, seen_pantry_items, seen_groups, seen_shopping_lists, seen_recipes,
sites, site_roles, system_admin, flags, row_type)"""
login = ()
self_conn = False
with open(f"application/administration/sql/insertLoginsTupleFull.sql", "r+") as file:
sql = file.read()
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
login = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
login = rows
if self_conn:
conn.commit()
conn.close()
return login
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertZonesTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
zone = ()
self_conn = False
with open(f"application/administration/sql/insertZonesTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
zone = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
zone = rows
if self_conn:
conn.commit()
conn.close()
return zone
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertLocationsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (uuid[str], name[str], zone_id[int], items[jsonb]) """
location = ()
self_conn = False
with open(f"application/administration/sql/insertLocationsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
location = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
location = rows
if self_conn:
conn.commit()
conn.close()
return location
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertVendorsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (vendor_name[str], vendor_address[str], creation_date[timestamp], created_by[int], phone_number[str]) """
vendor = ()
self_conn = False
with open(f"application/administration/sql/insertVendorsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
vendor = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
vendor = rows
if self_conn:
conn.commit()
conn.close()
return vendor
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertBrandsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (brand_name[str], ) """
brand = ()
self_conn = False
with open(f"application/administration/sql/insertBrandsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
brand = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
brand = rows
if self_conn:
conn.commit()
conn.close()
return brand
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateAddLoginSitesRoles(payload, convert=True, conn=None):
""" payload (tuple): (site_id, role_id, login_id) """
sql = f"UPDATE logins SET sites = sites || %s, site_roles = site_roles || %s WHERE id=%s RETURNING *;"
login = ()
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
login = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
login = rows
if self_conn:
conn.commit()
conn.close()
return login
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateSitesTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE sites SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateUsersSites(payload, convert=True, conn=None):
""" payload: {'site_id',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT logins.id FROM logins WHERE sites @> ARRAY[%s];"
with conn.cursor() as cur:
cur.execute(select_sql, (payload['site_id'], ))
user = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE logins SET sites = array_remove(sites, %s) WHERE id = %s;"
with conn.cursor() as cur:
for user_id in user:
cur.execute(update_sql, (payload['site_id'], user_id))
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
def updateUsersRoles(payload, convert=True, conn=None):
""" payload: {'role_id',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT logins.id FROM logins WHERE site_roles @> ARRAY[%s];"
with conn.cursor() as cur:
cur.execute(select_sql, (payload['role_id'], ))
users = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE logins SET site_roles = array_remove(site_roles, %s) WHERE id = %s;"
with conn.cursor() as cur:
for user_id in users:
cur.execute(update_sql, (payload['role_id'], user_id))
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
def updateRolesTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE roles SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateLoginsTuple(payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE logins SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def createTable(site, table, conn=None):
self_conn = False
with open(f"application/administration/sql/CREATE/{table}.sql", 'r') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise postsqldb.DatabaseError(error, sql, table)
def dropTable(site, table, conn=None):
self_conn = False
with open(f"application/administration/sql/DROP/{table}.sql", 'r') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise postsqldb.DatabaseError(error, sql, table)
def deleteSitesTuple(payload, convert=True, conn=None):
"""payload (tuple): (tuple_id, )"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM sites WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def deleteRolesTuple(payload, convert=True, conn=None):
"""payload (tuple): (tuple_id, )"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM roles WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)