1130 lines
42 KiB
Python
1130 lines
42 KiB
Python
from application import postsqldb
|
|
import config
|
|
import psycopg2
|
|
import datetime
|
|
|
|
def getTransactions(site:str, payload: tuple, convert:bool=True):
|
|
""" Page through a sites Transactions by passing a logistics id, limit, and offset through a payload
|
|
|
|
Args:
|
|
site (str): _description_
|
|
payload (tuple): (logistics_id, limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
database_config = config.config()
|
|
sql = f"SELECT * FROM {site}_transactions WHERE logistics_info_id=%s LIMIT %s OFFSET %s;"
|
|
sql_count = f"SELECT COUNT(*) FROM {site}_transactions WHERE logistics_info_id=%s;"
|
|
recordset = ()
|
|
count = 0
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
cur.execute(sql_count, (payload[0],))
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except Exception as error:
|
|
postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getTransaction(site:str, payload: tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
sql = f"SELECT * FROM {site}_transactions WHERE id=%s;"
|
|
record = ()
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
record = rows
|
|
return record
|
|
except Exception as error:
|
|
postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getItemAllByID(site:str, payload: tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
with open('application/items/sql/getItemAllByID.sql', 'r+') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
record = ()
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
record = rows
|
|
return record
|
|
except Exception as error:
|
|
postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getItemAllByBarcode(site:str, payload: tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
with open('application/items/sql/getItemAllByBarcode.sql', 'r+') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
record = ()
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
record = rows
|
|
return record
|
|
except Exception as error:
|
|
postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getItemsWithQOH(site:str, payload: tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
with open('application/items/sql/getItemsWithQOH.sql', 'r+') as file:
|
|
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload[3])
|
|
payload = list(payload)
|
|
payload.pop(3)
|
|
sql_count = f"SELECT COUNT(*) FROM {site}_items WHERE search_string LIKE '%%' || %s || '%%';"
|
|
recordset = ()
|
|
count = 0
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
cur.execute(sql_count, (payload[0],))
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except Exception as error:
|
|
postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getModalSKUs(site:str, payload:tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
with open("application/items/sql/itemsModal.sql") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
with open("application/items/sql/itemsModalCount.sql") as file:
|
|
sql_count = file.read().replace("%%site_name%%", site)
|
|
recordset = []
|
|
count = 0
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
print(payload)
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
cur.execute(sql_count, (payload[0],))
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getPrefixes(site:str, payload:tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
recordset = []
|
|
count = 0
|
|
with open(f"application/items/sql/getSkuPrefixes.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_sku_prefix;")
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getItemLink(site: str, payload:tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
item_link = ()
|
|
sql = f"SELECT * FROM {site}_itemlinks WHERE id=%s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item_link = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
item_link = rows
|
|
return item_link
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getLocation(site:str, payload:tuple, convert:bool=True):
|
|
selected = ()
|
|
database_config = config.config()
|
|
sql = f"SELECT * FROM {site}_locations WHERE id=%s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
selected = rows
|
|
return selected
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def getZone(site:str, payload:tuple, convert:bool=True):
|
|
selected = ()
|
|
database_config = config.config()
|
|
sql = f"SELECT * FROM {site}_zones WHERE id=%s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
selected = rows
|
|
return selected
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def selectItemLocationsTuple(site_name, payload, convert=True):
|
|
"""select a single tuple from ItemLocations table for site_name
|
|
|
|
Args:
|
|
conn (_T_connector@connect):
|
|
site_name (str):
|
|
payload (tuple): [item_id, location_id]
|
|
convert (bool): defaults to False, used to determine return of tuple/dict
|
|
|
|
Returns:
|
|
tuple: the row that was returned from the table
|
|
"""
|
|
item_locations = ()
|
|
database_config = config.config()
|
|
select_item_location_sql = f"SELECT * FROM {site_name}_item_locations WHERE part_id = %s AND location_id = %s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(select_item_location_sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item_locations = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
item_locations = rows
|
|
return item_locations
|
|
except Exception as error:
|
|
return error
|
|
|
|
def selectCostLayersTuple(site_name, payload, convert=True):
|
|
"""select a single or series of cost layers from the database for site_name
|
|
|
|
Args:
|
|
conn (_T_connector@connect):
|
|
site_name (str):
|
|
payload (tuple): (item_locations_id, )
|
|
convert (bool): defaults to False, used for determining return as tuple/dict
|
|
|
|
Returns:
|
|
list: list of tuples/dict from the cost_layers table for site_name
|
|
"""
|
|
cost_layers = ()
|
|
database_config = config.config()
|
|
select_cost_layers_sql = f"SELECT cl.* FROM {site_name}_item_locations il JOIN {site_name}_cost_layers cl ON cl.id = ANY(il.cost_layers) where il.id=%s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(select_cost_layers_sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
cost_layers = rows
|
|
cost_layers = [postsqldb.tupleDictionaryFactory(cur.description, layer) for layer in rows]
|
|
elif rows and not convert:
|
|
cost_layers = rows
|
|
return cost_layers
|
|
except Exception as error:
|
|
return error
|
|
|
|
def selectSiteTuple(payload, convert=True):
|
|
"""Select a single Site from sites using site_name
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (site_name,)
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: selected tuples
|
|
"""
|
|
site = ()
|
|
database_config = config.config()
|
|
select_site_sql = f"SELECT * FROM sites WHERE site_name = %s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(select_site_sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
site = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
site = rows
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, select_site_sql)
|
|
return site
|
|
|
|
def paginateZonesBySku(site: str, payload: tuple, convert=True):
|
|
database_config = config.config()
|
|
zones, count = (), 0
|
|
with open(f"application/items/sql/paginateZonesBySku.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
with open(f"application/items/sql/paginateZonesBySkuCount.sql", "r+") as file:
|
|
sql_count = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
zones = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
zones = rows
|
|
cur.execute(sql_count, payload)
|
|
count = cur.fetchone()[0]
|
|
return zones, count
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def paginateLocationsWithZone(site:str, payload:tuple, convert:bool=True):
|
|
recordset, count = (), 0
|
|
database_config = config.config()
|
|
with open(f"application/items/sql/getLocationsWithZone.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_locations;")
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, (), sql)
|
|
|
|
def paginateLocationsBySkuZone(site: str, payload: tuple, convert=True):
|
|
database_config = config.config()
|
|
locations, count = (), 0
|
|
with open(f"application/items/sql/paginateLocationsBySkuZone.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
with open(f"application/items/sql/paginateLocationsBySkuZoneCount.sql", "r+") as file:
|
|
sql_count = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
locations = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
locations = rows
|
|
cur.execute(sql_count, payload)
|
|
count = cur.fetchone()[0]
|
|
return locations, count
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def paginateBrands(site:str, payload:tuple, convert:bool=True):
|
|
database_config = config.config()
|
|
recordset, count = [], 0
|
|
sql = f"SELECT brand.id, brand.name FROM {site}_brands brand LIMIT %s OFFSET %s;"
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [postsqldb.tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_brands")
|
|
count = cur.fetchone()[0]
|
|
return recordset, count
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertCostLayersTuple(site, payload, convert=True, conn=None):
|
|
cost_layer = ()
|
|
self_conn = False
|
|
|
|
with open(f"application/items/sql/insertCostLayersTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
cost_layer = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
cost_layer = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return cost_layer
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertItemLocationsTuple(site, payload, convert=True, conn=None):
|
|
"""insert payload into item_locations table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (part_id[int], location_id[int], quantity_on_hand[float], cost_layers[lst2pgarr])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
location = ()
|
|
self_conn = False
|
|
database_config = config.config()
|
|
with open(f"application/items/sql/insertItemLocationsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
location = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
location = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return location
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertLogisticsInfoTuple(site, payload, convert=True, conn=None):
|
|
"""insert payload into logistics_info table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (barcode[str], primary_location[str], auto_issue_location[str], dynamic_locations[jsonb],
|
|
location_data[jsonb], quantity_on_hand[float])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
logistics_info = ()
|
|
self_conn = False
|
|
|
|
with open(f"application/items/sql/insertLogisticsInfoTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
logistics_info = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
logistics_info = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return logistics_info
|
|
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertItemInfoTuple(site, payload, convert=True, conn=None):
|
|
"""inserts payload into the item_info table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site_name (str):
|
|
payload (tuple): (barcode[str], linked_items[lst2pgarr], shopping_lists[lst2pgarr], recipes[lst2pgarr], groups[lst2pgarr],
|
|
packaging[str], uom[str], cost[float], safety_stock[float], lead_time_days[float], ai_pick[bool])
|
|
convert (bool optional): Determines if to return tuple as dictionary. DEFAULTS to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
item_info = ()
|
|
self_conn = False
|
|
with open(f"application/items/sql/insertItemInfoTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item_info = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
item_info = rows
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return item_info
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertFoodInfoTuple(site, payload, convert=True, conn=None):
|
|
"""insert payload into food_info table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (_type_): (ingrediants[lst2pgarr], food_groups[lst2pgarr], nutrients[jsonstr], expires[bool])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
food_info = ()
|
|
self_conn = False
|
|
with open(f"application/items/sql/insertFoodInfoTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
food_info = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
food_info = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return food_info
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertItemTuple(site, payload, convert=True, conn=None):
|
|
"""insert payload into items table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (barcode[str], item_name[str], brand[int], description[str],
|
|
tags[lst2pgarr], links[jsonb], item_info_id[int], logistics_info_id[int],
|
|
food_info_id[int], row_type[str], item_type[str], search_string[str])
|
|
convert (bool, optional): Determines if to return tuple as a dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
item = ()
|
|
self_conn = False
|
|
with open(f"application/items/sql/insertItemTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
item = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return item
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertSKUPrefixtuple(site:str, payload:tuple, convert=True, conn=None):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
prefix = ()
|
|
self_conn = False
|
|
with open(f"application/items/sql/insertSKUPrefixTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
prefix = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
prefix = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return prefix
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def insertConversionTuple(site: str, payload: list, convert=True, conn=None):
|
|
"""insert into recipes table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (stre):
|
|
payload (tuple): (item_id, uom_id, conversion_factor)
|
|
convert (bool, optional): Determines if to return tuple as a dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
record = ()
|
|
self_conn = False
|
|
with open(f"sql/INSERT/insertConversionsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = True
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return record
|
|
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def postDeleteCostLayer(site_name, payload, convert=True, conn=None):
|
|
"""
|
|
payload (tuple): (table_to_delete_from, tuple_id)
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: deleted tuple
|
|
"""
|
|
deleted = ()
|
|
self_conn = False
|
|
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_cost_layers WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
|
|
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
|
|
elif rows and not convert:
|
|
deleted = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return deleted
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def deleteConversionTuple(site_name: str, payload: tuple, convert=True, conn=None):
|
|
"""This is a basic funtion to delete a tuple from a table in site with an id. All
|
|
tables in this database has id's associated with them.
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site_name (str):
|
|
payload (tuple): (tuple_id,...)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: deleted tuple
|
|
"""
|
|
deleted = ()
|
|
self_conn = False
|
|
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_conversions WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
deleted = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
deleted = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return deleted
|
|
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def postUpdateItemLocation(site, payload, conn=None):
|
|
|
|
item_location = ()
|
|
self_conn = False
|
|
with open(f"sql/updateItemLocation.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows:
|
|
item_location = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return item_location
|
|
except Exception as error:
|
|
return error
|
|
|
|
def postUpdateItem(site:str, payload:dict):
|
|
""" POST and update to an item
|
|
|
|
Args:
|
|
site (str): name of the site the item exists in.
|
|
payload (dict): STRICT FORMAT
|
|
{id: item_id, data: SEE BELOW, user_id: updater}
|
|
|
|
data is complex structure
|
|
top level keys should be a combo of: ['item', 'item_info', 'logistics_info', 'food_info']
|
|
with in each of these top levels there are key value pairs in this format
|
|
{'column_name': 'new_value'}
|
|
"""
|
|
def postUpdateData(conn, table, payload, convert=True):
|
|
updated = ()
|
|
|
|
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
def postAddTransaction(conn, site, payload, convert=False):
|
|
transaction = ()
|
|
with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
transaction = rows
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
return transaction
|
|
|
|
transaction_data = {}
|
|
database_config = config.config()
|
|
data = payload['update']
|
|
for key in data.keys():
|
|
for key_2 in data[key].keys():
|
|
transaction_data[f"{key_2}_new"] = data[key][key_2]
|
|
try:
|
|
with psycopg2.connect(**database_config) as conn:
|
|
item = getItemAllByID(site, (payload['id'], ))
|
|
if 'item_info' in data.keys() and data['item_info'] != {}:
|
|
for key in data['item_info'].keys():
|
|
transaction_data[f"{key}_old"] = item['item_info'][key]
|
|
postUpdateData(conn, f"{site}_item_info", {'id': item['item_info_id'], 'update': data['item_info']})
|
|
|
|
if 'food_info' in data.keys() and data['food_info'] != {}:
|
|
for key in data['food_info'].keys():
|
|
transaction_data[f"{key}_old"] = item['food_info'][key]
|
|
postUpdateData(conn, f"{site}_food_info", {'id': item['food_info_id'], 'update': data['food_info']})
|
|
|
|
if 'logistics_info' in data.keys() and data['logistics_info'] != {}:
|
|
for key in data['logistics_info'].keys():
|
|
transaction_data[f"{key}_old"] = item['logistics_info'][key]
|
|
postUpdateData(conn, f"{site}_logistics_info", {'id': item['logistics_info_id'], 'update': data['logistics_info']})
|
|
|
|
if 'item' in data.keys() and data['item'] != {}:
|
|
for key in data['item'].keys():
|
|
if key == "brand":
|
|
transaction_data[f"{key}_old"] = item['brand']['id']
|
|
else:
|
|
transaction_data[f"{key}_old"] = item[key]
|
|
postUpdateData(conn, f"{site}_items", {'id': payload['id'], 'update': data['item']})
|
|
|
|
trans = postsqldb.TransactionPayload(
|
|
timestamp=datetime.datetime.now(),
|
|
logistics_info_id=item['logistics_info_id'],
|
|
barcode=item['barcode'],
|
|
name=item['item_name'],
|
|
transaction_type="UPDATE",
|
|
quantity=0.0,
|
|
description="Item was updated!",
|
|
user_id=payload['user_id'],
|
|
data=transaction_data
|
|
)
|
|
postAddTransaction(conn, site, trans.payload())
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, "MULTICALL!")
|
|
|
|
def postUpdateItemLink(site: str, payload: dict):
|
|
""" POST update to ItemLink
|
|
|
|
Args:
|
|
site (str): _description_
|
|
payload (dict): {id, update, old_conv_factor, user_id}
|
|
"""
|
|
def postUpdateData(conn, table, payload, convert=True):
|
|
updated = ()
|
|
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
def postAddTransaction(conn, site, payload, convert=False):
|
|
transaction = ()
|
|
with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
transaction = rows
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
return transaction
|
|
|
|
database_config = config.config()
|
|
transaction_time = datetime.datetime.now()
|
|
barcode = payload['barcode']
|
|
with psycopg2.connect(**database_config) as conn:
|
|
linkedItem = getItemAllByBarcode(site, (barcode, ))
|
|
|
|
transaction = postsqldb.TransactionPayload(
|
|
timestamp=transaction_time,
|
|
logistics_info_id=linkedItem['logistics_info_id'],
|
|
barcode=barcode,
|
|
name=linkedItem['item_name'],
|
|
transaction_type='UPDATE',
|
|
quantity=0.0,
|
|
description='Link updated!',
|
|
user_id=payload['user_id'],
|
|
data={'new_conv_factor': payload['update']['conv_factor'], 'old_conv_factor': payload['old_conv_factor']}
|
|
)
|
|
|
|
postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}})
|
|
postAddTransaction(conn, site, transaction.payload())
|
|
|
|
def postUpdateCostLayer(site, payload, convert=True, conn=None):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
self_conn = False
|
|
|
|
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_cost_layers SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return updated
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def postAddTransaction(site, payload, convert=False, conn=None):
|
|
transaction = ()
|
|
self_conn = False
|
|
|
|
with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
transaction = rows
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return transaction
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def postInsertItemLink(site, payload, convert=True, conn=None):
|
|
"""insert payload into itemlinks table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (barcode[str], link[int], data[jsonb], conv_factor[float])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
link = ()
|
|
self_conn = False
|
|
|
|
with open(f"application/items/sql/insertItemLinksTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
link = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
link = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return link, conn
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|
|
|
|
def postUpdateItemByID(site, payload, convert=True, conn=None):
|
|
""" high level update of an item specific data, none of its relationships
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
self_conn = False
|
|
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_items SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
if not conn:
|
|
database_config = config.config()
|
|
conn = psycopg2.connect(**database_config)
|
|
conn.autocommit = False
|
|
self_conn = True
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
|
|
if self_conn:
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return updated, conn
|
|
except Exception as error:
|
|
raise postsqldb.DatabaseError(error, payload, sql)
|