pantry-track/application/site_management/site_management_database.py
Jadowyne Ulve fe34ddafe2 Finished up with migratione and clean up
of site_management module
2025-08-02 20:58:00 -05:00

489 lines
16 KiB
Python

# 3RD PARTY IMPORTS
import psycopg2
# APPLICATION IMPORTS
import config
from application import postsqldb
def paginateZonesTuples(site, payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = ()
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_zones LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_zones;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateLocationsTuples(site, payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = ()
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_locations LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_locations;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateVendorsTuples(site, payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = ()
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_vendors LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_vendors;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateBrandsTuples(site, payload, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = ()
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_brands LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_brands;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginatePrefixesTuples(site, payload, convert=True, conn=None):
""" payload (_type_): (limit, offset) """
recordset = []
self_conn = False
count = 0
sql = f"SELECT * FROM {site}_sku_prefix LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_sku_prefix;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return recordset, count
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectSitesTupleByName(payload, convert=True, conn=None):
""" payload (_type_): (site_name,) """
record = ()
self_conn = False
sql = f"SELECT id FROM sites WHERE site_name = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectZonesTuples(site, convert=True, conn=None):
""" payload (tuple): (limit, offset) """
recordset = ()
self_conn = False
sql = f"SELECT * FROM {site}_zones;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
if self_conn:
conn.commit()
conn.close()
return recordset
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def insertZonesTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
zone = ()
self_conn = False
with open(f"application/site_management/sql/insertZonesTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
zone = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
zone = rows
if self_conn:
conn.commit()
conn.close()
return zone
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertLocationsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
zone = ()
self_conn = False
with open(f"application/site_management/sql/insertLocationsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
zone = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
zone = rows
if self_conn:
conn.commit()
conn.close()
return zone
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertVendorsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
zone = ()
self_conn = False
with open(f"application/site_management/sql/insertVendorsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
zone = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
zone = rows
if self_conn:
conn.commit()
conn.close()
return zone
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertBrandsTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
brand = ()
self_conn = False
with open(f"application/site_management/sql/insertBrandsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
brand = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
brand = rows
if self_conn:
conn.commit()
conn.close()
return brand
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertSKUPrefixesTuple(site, payload, convert=True, conn=None):
""" payload (tuple): (name[str],) """
prefix = ()
self_conn = False
with open(f"application/site_management/sql/insertSKUPrefixTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
prefix = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
prefix = rows
if self_conn:
conn.commit()
conn.close()
return prefix
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateSKUPrefixesTuple(site, payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_sku_prefix SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateZonesTuple(site, payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_zones SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateVendorsTuple(site, payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_vendors SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateBrandsTuple(site, payload, convert=True, conn=None):
""" payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_brands SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)