pantry-track/application/receipts/receipts_database.py
2025-08-02 15:59:07 -05:00

724 lines
24 KiB
Python

# 3RD PARTY IMPORTS
import psycopg2
# APPLICATION IMPORTS
import config
from application import postsqldb
def requestNextReceiptID(site_name, conn=None):
"""gets the next id for receipts_id, currently returns a 8 digit number
Args:
site (str): site to get the next id for
Returns:
json: receipt_id, message, error keys
"""
next_receipt_id = None
self_conn = False
sql = f"SELECT receipt_id FROM {site_name}_receipts ORDER BY id DESC LIMIT 1;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
next_receipt_id = cur.fetchone()
if next_receipt_id == None:
next_receipt_id = "00000001"
else:
next_receipt_id = next_receipt_id[0]
next_receipt_id = int(next_receipt_id.split("-")[1]) + 1
y = str(next_receipt_id)
len_str = len(y)
x = "".join(["0" for _ in range(8 - len_str)])
next_receipt_id = x + y
if self_conn:
conn.commit()
conn.close()
return next_receipt_id
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload=(), sql=sql)
def getItemsWithQOH(site, payload, convert=True, conn=None):
recordset = []
count = 0
self_conn = False
with open(f"application/receipts/sql/getItemsWithQOH.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload[3])
payload = list(payload)
payload.pop(3)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
if convert:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, payload)
recordset = cur.fetchall()
recordset = [dict(record) for record in recordset]
cur.execute(f"SELECT COUNT(*) FROM {site}_items WHERE search_string LIKE '%%' || %s || '%%';", (payload[0], ))
count = cur.fetchone()
else:
with conn.cursor() as cur:
cur.execute(sql, payload)
recordset = cur.fetchall()
cur.execute(f"SELECT COUNT(*) FROM {site}_items WHERE search_string LIKE '%%' || %s || '%%';", (payload[0], ))
count = cur.fetchone()
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def getLinkedItemByBarcode(site, payload, convert=True, conn=None):
item = ()
self_conn = False
sql = f"SELECT * FROM {site}_itemlinks WHERE barcode=%s;"
if convert:
item = {}
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
item = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
item = rows
if self_conn:
conn.close()
return item
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def getItemAllByBarcode(site, payload, convert=True, conn=None):
item = ()
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
if convert:
item = {}
linked_item = getLinkedItemByBarcode(site, (payload[0],), conn=conn)
if len(linked_item) > 1:
item = getItemAllByID(site, payload=(linked_item['link'], ), convert=convert, conn=conn)
item['item_info']['uom_quantity'] = linked_item['conv_factor']
if self_conn:
conn.close()
return item
else:
with open(f"application/receipts/sql/getItemAllByBarcode.sql", "r+") as file:
getItemAllByBarcode_sql = file.read().replace("%%site_name%%", site)
try:
with conn.cursor() as cur:
cur.execute(getItemAllByBarcode_sql, payload)
rows = cur.fetchone()
if rows and convert:
item = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
item = rows
if self_conn:
conn.close()
return item
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, getItemAllByBarcode_sql)
def getItemAllByID(site, payload, convert=True, conn=None):
item = ()
self_conn = False
with open(f"application/receipts/sql/getItemAllByID.sql", "r+") as file:
getItemAllByID_sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(getItemAllByID_sql, payload)
rows = cur.fetchone()
if rows and convert:
item = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
item = rows
if self_conn:
conn.close()
return item
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, getItemAllByID_sql)
def getReceiptByID(site, payload, convert=True, conn=None):
receipt = []
self_conn = False
with open(f"application/receipts/sql/getReceiptByID.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
row = cur.fetchone()
if row and convert:
receipt = postsqldb.tupleDictionaryFactory(cur.description, row)
if row and not convert:
receipt = row
if self_conn:
conn.close()
return receipt
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def paginateReceiptsTuples(site, payload, convert=True, conn=None):
"""payload=(limit, offset)"""
receipts = []
count = 0
self_conn = False
with open(f"application/receipts/sql/getReceipts.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
receipts = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
receipts = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_receipts;")
count = cur.fetchone()[0]
if self_conn:
conn.commit()
conn.close()
return receipts, count
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def paginateVendorsTuples(site, payload, convert=True, conn=None):
"""payload (tuple): (limit, offset)"""
recordset = ()
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_vendors LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
recordset = rows
cur.execute(f"SELECT COUNT(*) FROM {site}_vendors;")
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise postsqldb.DatabaseError(error, (), sql)
def paginateLinkedLists(site, payload, convert=True, conn=None):
records = []
count = 0
self_conn = False
sql = f"SELECT * FROM {site}_items WHERE row_type = 'list' LIMIT %s OFFSET %s;"
sql_count = f"SELECT COUNT(*) FROM {site}_items WHERE row_type = 'list' LIMIT %s OFFSET %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
records = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
records = rows
cur.execute(sql_count, payload)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return records, count
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectItemLocationsTuple(site_name, payload, convert=True, conn=None):
"""payload (tuple): [item_id, location_id]"""
item_locations = ()
self_conn = False
select_item_location_sql = f"SELECT * FROM {site_name}_item_locations WHERE part_id = %s AND location_id = %s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_item_location_sql, payload)
rows = cur.fetchone()
if rows and convert:
item_locations = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
item_locations = rows
if self_conn:
conn.close()
return item_locations
except Exception as error:
return error
def selectLocationsTuple(site, payload, convert=True, conn=None):
selected = ()
self_conn = False
sql = f"SELECT * FROM {site}_locations WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
selected = rows
if self_conn:
conn.close()
return selected
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectReceiptsTuple(site, payload, convert=True, conn=None):
selected = ()
self_conn = False
sql = f"SELECT * FROM {site}_receipts WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
selected = rows
if self_conn:
conn.close()
return selected
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectReceiptItemsTuple(site, payload, convert=True, conn=None):
selected = ()
self_conn = False
sql = f"SELECT * FROM {site}_receipt_items WHERE id=%s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
selected = rows
if self_conn:
conn.close()
return selected
except (Exception, psycopg2.DatabaseError) as error:
raise postsqldb.DatabaseError(error, payload, sql)
def deleteReceiptItemsTuple(site, payload, convert=True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {site}_receipt_items WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertTransactionsTuple(site, payload, convert=True, conn=None):
"""
payload (tuple): (timestamp[timestamp], logistics_info_id[int], barcode[str], name[str],
transaction_type[str], quantity[float], description[str], user_id[int], data[jsonb])
"""
transaction = ()
self_conn = False
with open(f"application/receipts/sql/insertTransactionsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
transaction = rows
if self_conn:
conn.commit()
conn.close()
return transaction
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertItemLinksTuple(site, payload, convert=True, conn=None):
"""payload (tuple): (barcode[str], link[int], data[jsonb], conv_factor[float]) """
link = ()
self_conn = False
with open(f"application/receipts/sql/insertItemLinksTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
link = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
link = rows
if self_conn:
conn.commit()
conn.close()
return link
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertCostLayersTuple(site, payload, convert=True, conn=None):
"""payload (tuple): (aquisition_date[timestamp], quantity[float], cost[float], currency_type[str], expires[timestamp/None], vendor[int])"""
cost_layer = ()
self_conn = False
with open(f"application/receipts/sql/insertCostLayersTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
cost_layer = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
cost_layer = rows
if self_conn:
conn.commit()
conn.close()
return cost_layer
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertReceiptItemsTuple(site, payload, convert=True, conn=None):
receipt_item = ()
self_conn = False
with open(f"application/receipts/sql/insertReceiptItemsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
receipt_item = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
receipt_item = rows
if self_conn:
conn.commit()
conn.close()
return receipt_item
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertReceiptsTuple(site, payload, convert=True, conn=None):
receipt = ()
self_conn = False
with open(f"application/receipts/sql/insertReceiptsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
receipt = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
receipt = rows
if self_conn:
conn.commit()
conn.close()
return receipt
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateItemsTuple(site, payload, convert=True, conn=None):
"""payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_items SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateItemLocation(site, payload, convert=True, conn=None):
item_location = ()
self_conn = False
with open(f"application/receipts/sql/updateItemLocation.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
item_location = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
item_location = rows
if self_conn:
conn.commit()
conn.close()
return item_location
except Exception as error:
return error
def updateReceiptsTuple(site, payload, convert=True, conn=None):
"""payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_receipts SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def updateReceiptItemsTuple(site, payload, convert=True, conn=None):
"""_summary_
Args:
conn (_T_connector@connect): Postgresql Connector
site (str):
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
convert (bool, optional): determines if to return tuple as dictionary. Defaults to True.
Raises:
DatabaseError:
Returns:
tuple or dict: updated tuple
"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_receipt_items SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)