2489 lines
81 KiB
Python
2489 lines
81 KiB
Python
import datetime
|
|
import psycopg2, json
|
|
import psycopg2.extras
|
|
from dataclasses import dataclass, field
|
|
import random
|
|
import string
|
|
import config
|
|
|
|
class DatabaseError(Exception):
|
|
def __init__(self, message, payload=[], sql=""):
|
|
super().__init__(message)
|
|
self.payload = payload
|
|
self.message = str(message).replace("\n", "")
|
|
self.sql = sql.replace("\n", "")
|
|
self.log_error()
|
|
|
|
def log_error(self):
|
|
with open("database.log", "a+") as file:
|
|
file.write("\n")
|
|
file.write(f"{datetime.datetime.now()} --- ERROR --- DatabaseError(message='{self.message}',\n")
|
|
file.write(f"{" "*41}payload={self.payload},\n")
|
|
file.write(f"{" "*41}sql='{self.sql}')")
|
|
|
|
def __str__(self):
|
|
return f"DatabaseError(message='{self.message}', payload={self.payload}, sql='{self.sql}')"
|
|
|
|
def tupleDictionaryFactory(columns, row):
|
|
columns = [desc[0] for desc in columns]
|
|
return dict(zip(columns, row))
|
|
|
|
def lst2pgarr(alist):
|
|
return '{' + ','.join(alist) + '}'
|
|
|
|
def updateStringFactory(updated_values: dict):
|
|
set_clause = ', '.join([f"{key} = %s" for key in updated_values.keys()])
|
|
values = []
|
|
for value in updated_values.values():
|
|
if isinstance(value, dict):
|
|
value = json.dumps(value)
|
|
values.append(value)
|
|
|
|
return set_clause, values
|
|
|
|
def getUUID(n):
|
|
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=n))
|
|
return random_string
|
|
|
|
def get_sites(sites=[]):
|
|
database_config = config.config()
|
|
with psycopg2.connect(**database_config) as conn:
|
|
try:
|
|
with conn.cursor() as cur:
|
|
site_rows = []
|
|
for each in sites:
|
|
cur.execute(f"SELECT * FROM sites WHERE id=%s;", (each, ))
|
|
site_rows.append(cur.fetchone())
|
|
return site_rows
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
print(error)
|
|
conn.rollback()
|
|
return False
|
|
|
|
|
|
class ConversionsTable:
|
|
@dataclass
|
|
class Payload:
|
|
item_id: int
|
|
uom_id: int
|
|
conv_factor: float
|
|
|
|
def payload(self):
|
|
return (
|
|
self.item_id,
|
|
self.uom_id,
|
|
self.conv_factor
|
|
)
|
|
|
|
@classmethod
|
|
def create_table(self, conn, site):
|
|
with open(f"sql/CREATE/conversions.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, sql, "PrefixTable")
|
|
|
|
@classmethod
|
|
def delete_table(self, conn, site):
|
|
with open(f"sql/DROP/conversions.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, 'ConversionsTable', sql)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site: str, payload: list, convert=True):
|
|
"""insert into recipes table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (stre):
|
|
payload (tuple): (item_id, uom_id, conversion_factor)
|
|
convert (bool, optional): Determines if to return tuple as a dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
record = ()
|
|
with open(f"sql/INSERT/insertConversionsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return record
|
|
|
|
|
|
@classmethod
|
|
def delete_item_tuple(self, conn, site_name, payload, convert=True):
|
|
"""This is a basic funtion to delete a tuple from a table in site with an id. All
|
|
tables in this database has id's associated with them.
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site_name (str):
|
|
payload (tuple): (tuple_id,...)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: deleted tuple
|
|
"""
|
|
deleted = ()
|
|
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_conversions WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
deleted = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
deleted = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return deleted
|
|
|
|
@classmethod
|
|
def update_item_tuple(self, conn, site, payload, convert=False):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_conversions SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class ShoppingListsTable:
|
|
@dataclass
|
|
class Payload:
|
|
name: str
|
|
description: str
|
|
author: int
|
|
type: str = "plain"
|
|
creation_date: datetime.datetime = field(init=False)
|
|
|
|
def __post_init__(self):
|
|
self.creation_date = datetime.datetime.now()
|
|
|
|
def payload(self):
|
|
return (
|
|
self.name,
|
|
self.description,
|
|
self.author,
|
|
self.creation_date,
|
|
self.type
|
|
)
|
|
|
|
@dataclass
|
|
class ItemPayload:
|
|
uuid: str
|
|
sl_id: int
|
|
item_type: str
|
|
item_name: str
|
|
uom: str
|
|
qty: float
|
|
item_id: int = None
|
|
links: dict = field(default_factory=dict)
|
|
|
|
def payload(self):
|
|
return (
|
|
self.uuid,
|
|
self.sl_id,
|
|
self.item_type,
|
|
self.item_name,
|
|
self.uom,
|
|
self.qty,
|
|
self.item_id,
|
|
json.dumps(self.links)
|
|
)
|
|
|
|
@classmethod
|
|
def getItem(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (id, )
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
record = ()
|
|
with open('sql/SELECT/selectShoppingListItem.sql', 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return record
|
|
|
|
class UnitsTable:
|
|
@dataclass
|
|
class Payload:
|
|
__slots__ = ('plural', 'single', 'fullname', 'description')
|
|
|
|
plural: str
|
|
single: str
|
|
fullname: str
|
|
description: str
|
|
|
|
def payload(self):
|
|
return (
|
|
self.plural,
|
|
self.single,
|
|
self.fullname,
|
|
self.description
|
|
)
|
|
|
|
@classmethod
|
|
def create_table(self, conn):
|
|
with open(f"sql/CREATE/units.sql", 'r') as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, sql, "UnitsTable")
|
|
|
|
@classmethod
|
|
def delete_table(self, conn):
|
|
with open(f"sql/DROP/units.sql", 'r') as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, 'PrefixTable', sql)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, payload: list, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
payload (list): (plural, single, fullname, description)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
record = ()
|
|
with open(f"sql/INSERT/insertUnitsTuple.sql", "r+") as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return record
|
|
|
|
@classmethod
|
|
def getAll(self, conn, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
records = ()
|
|
sql = f"SELECT * FROM units;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
records = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, "", sql)
|
|
return records
|
|
|
|
class SKUPrefixTable:
|
|
@dataclass
|
|
class Payload:
|
|
__slots__ = ('uuid', 'name', 'description')
|
|
|
|
uuid: str
|
|
name: str
|
|
description: str
|
|
|
|
def payload(self):
|
|
return (
|
|
self.uuid,
|
|
self.name,
|
|
self.description
|
|
)
|
|
|
|
@classmethod
|
|
def create_table(self, conn, site):
|
|
with open(f"sql/CREATE/sku_prefix.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, sql, "PrefixTable")
|
|
|
|
@classmethod
|
|
def delete_table(self, conn, site):
|
|
with open(f"sql/DROP/sku_prefix.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, 'PrefixTable', sql)
|
|
|
|
@classmethod
|
|
def paginatePrefixes(self, conn, site: str, payload: tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = []
|
|
count = 0
|
|
with open(f"sql/SELECT/getSkuPrefixes.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_sku_prefix;")
|
|
count = cur.fetchone()[0]
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
prefix = ()
|
|
with open(f"sql/INSERT/insertSKUPrefixTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
prefix = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
prefix = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return prefix
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_sku_prefix SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class RecipesTable:
|
|
@dataclass
|
|
class Payload:
|
|
#__slots__ = ('name', 'author', 'description', 'created_date', 'instructions', 'picture_path')
|
|
|
|
name: str
|
|
author: int
|
|
description: str
|
|
created_date: datetime = field(init=False)
|
|
instructions: list = field(default_factory=list)
|
|
picture_path: str = ""
|
|
|
|
def __post_init__(self):
|
|
self.created_date = datetime.datetime.now()
|
|
|
|
def payload(self):
|
|
return (
|
|
self.name,
|
|
self.author,
|
|
self.description,
|
|
self.created_date,
|
|
lst2pgarr(self.instructions),
|
|
self.picture_path
|
|
)
|
|
|
|
@dataclass
|
|
class ItemPayload:
|
|
uuid: str
|
|
rp_id: int
|
|
item_type: str
|
|
item_name:str
|
|
uom: int
|
|
qty: float = 0.0
|
|
item_id: int = None
|
|
links: dict = field(default_factory=dict)
|
|
|
|
def payload(self):
|
|
return (
|
|
self.uuid,
|
|
self.rp_id,
|
|
self.item_type,
|
|
self.item_name,
|
|
self.uom,
|
|
self.qty,
|
|
self.item_id,
|
|
json.dumps(self.links)
|
|
)
|
|
|
|
@classmethod
|
|
def create_table(self, conn, site):
|
|
with open(f"sql/CREATE/recipes.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, sql, "PrefixTable")
|
|
|
|
@classmethod
|
|
def delete_table(self, conn, site):
|
|
with open(f"sql/DROP/recipes.sql", 'r') as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
except Exception as error:
|
|
raise DatabaseError(error, 'PrefixTable', sql)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site: str, payload: list, convert=True):
|
|
"""insert into recipes table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (stre):
|
|
payload (tuple): (name[str], author[int], description[str], creation_date[timestamp], instructions[list], picture_path[str])
|
|
convert (bool, optional): Determines if to return tuple as a dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
recipe = ()
|
|
with open(f"sql/INSERT/insertRecipesTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
recipe = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
recipe = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recipe
|
|
|
|
@classmethod
|
|
def insert_item_tuple(self, conn, site, payload, convert=True):
|
|
"""insert into recipe_items table for site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (stre):
|
|
payload (tuple): (uuid[str], rp_id[int], item_type[str], item_name[str], uom[str], qty[float], item_id[int], links[jsonb])
|
|
convert (bool, optional): Determines if to return tuple as a dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
recipe_item = ()
|
|
with open(f"sql/INSERT/insertRecipeItemsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
recipe_item = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
recipe_item = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recipe_item
|
|
|
|
@classmethod
|
|
def delete_item_tuple(self, conn, site_name, payload, convert=True):
|
|
"""This is a basic funtion to delete a tuple from a table in site with an id. All
|
|
tables in this database has id's associated with them.
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site_name (str):
|
|
payload (tuple): (tuple_id,...)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: deleted tuple
|
|
"""
|
|
deleted = ()
|
|
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_recipe_items WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
deleted = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
deleted = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return deleted
|
|
|
|
@classmethod
|
|
def update_item_tuple(self, conn, site, payload, convert=False):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_recipe_items SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
@classmethod
|
|
def getRecipes(self, conn, site: str, payload: tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = []
|
|
count = 0
|
|
with open(f"sql/SELECT/getRecipes.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_recipes;")
|
|
count = cur.fetchone()[0]
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def getRecipe(self, conn, site: str, payload: tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (id, )
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
record = ()
|
|
with open(f"sql/SELECT/getRecipeByID.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
record = rows
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return record
|
|
|
|
@classmethod
|
|
def updateRecipe(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_recipes SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class ItemInfoTable:
|
|
@dataclass
|
|
class Payload:
|
|
barcode: str
|
|
packaging: str = ""
|
|
uom_quantity: float = 1.0
|
|
uom: int = 1
|
|
cost: float = 0.0
|
|
safety_stock: float = 0.0
|
|
lead_time_days: float = 0.0
|
|
ai_pick: bool = False
|
|
prefixes: list = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.barcode, str):
|
|
raise TypeError(f"barcode must be of type str; not {type(self.barcode)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.barcode,
|
|
self.packaging,
|
|
self.uom_quantity,
|
|
self.uom,
|
|
self.cost,
|
|
self.safety_stock,
|
|
self.lead_time_days,
|
|
self.ai_pick,
|
|
lst2pgarr(self.prefixes)
|
|
)
|
|
@classmethod
|
|
def select_tuple(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (item_info_id,)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
selected = ()
|
|
sql = f"SELECT * FROM {site}_item_info WHERE id=%s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
selected = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
selected = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return selected
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site:str, payload: dict, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_item_info SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class ItemTable:
|
|
|
|
@classmethod
|
|
def paginateLinkedLists(self, conn, site:str, payload:tuple, convert=True):
|
|
records = []
|
|
count = 0
|
|
|
|
sql = f"SELECT * FROM {site}_items WHERE row_type = 'list' LIMIT %s OFFSET %s;"
|
|
sql_count = f"SELECT COUNT(*) FROM {site}_items WHERE row_type = 'list' LIMIT %s OFFSET %s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
if rows and not convert:
|
|
records = rows
|
|
|
|
cur.execute(sql_count, payload)
|
|
count = cur.fetchone()[0]
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return records, count
|
|
|
|
@classmethod
|
|
def getItemAllByID(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (item_id, )
|
|
convert (bool, optional): _description_. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
item = ()
|
|
with open(f"sql/SELECT/getItemAllByID.sql", "r+") as file:
|
|
getItemAllByID_sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(getItemAllByID_sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item = tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
item = rows
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, getItemAllByID_sql)
|
|
return item
|
|
|
|
@classmethod
|
|
def getLinkedItemByBarcode(self, conn, site, payload, convert=True):
|
|
item = ()
|
|
sql = f"SELECT * FROM {site}_itemlinks WHERE barcode=%s;"
|
|
if convert:
|
|
item = {}
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item = tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
item = rows
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return item
|
|
|
|
@classmethod
|
|
def getItemAllByBarcode(self, conn, site, payload, convert=True):
|
|
item = ()
|
|
if convert:
|
|
item = {}
|
|
linked_item = self.getLinkedItemByBarcode(conn, site, (payload[0],))
|
|
|
|
if len(linked_item) > 1:
|
|
item = self.getItemAllByID(conn, site, payload=(linked_item['link'], ), convert=convert)
|
|
item['item_info']['uom_quantity'] = linked_item['conv_factor']
|
|
else:
|
|
with open(f"sql/SELECT/getItemAllByBarcode.sql", "r+") as file:
|
|
getItemAllByBarcode_sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(getItemAllByBarcode_sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item = tupleDictionaryFactory(cur.description, rows)
|
|
if rows and not convert:
|
|
item = rows
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
raise DatabaseError(error, payload, getItemAllByBarcode_sql)
|
|
return item
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_items SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class ReceiptTable:
|
|
|
|
@classmethod
|
|
def update_receipt(self, conn, site:str, payload:dict, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_receipts SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
@classmethod
|
|
def update_receipt_item(self, conn, site:str, payload:dict, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_receipt_items SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
@classmethod
|
|
def select_tuple(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (receipt_id,)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
selected = ()
|
|
sql = f"SELECT * FROM {site}_receipts WHERE id=%s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
selected = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
selected = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return selected
|
|
|
|
@classmethod
|
|
def select_item_tuple(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (_type_): _description_
|
|
payload (_type_): (receipt_id,)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
selected = ()
|
|
sql = f"SELECT * FROM {site}_receipt_items WHERE id=%s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
selected = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
selected = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return selected
|
|
|
|
class ZonesTable:
|
|
@dataclass
|
|
class Payload:
|
|
name: str
|
|
description: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.name, str):
|
|
raise TypeError(f"Zone name should be of type str; not {type(self.name)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.name,
|
|
self.description,
|
|
)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
zone = ()
|
|
with open(f"sql/INSERT/insertZonesTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
zone = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
zone = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return zone
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_zones SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
@classmethod
|
|
def paginateZones(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = ()
|
|
count = 0
|
|
sql = f"SELECT * FROM {site}_zones LIMIT %s OFFSET %s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_zones;")
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def paginateZonesBySku(self, conn, site: str, payload: tuple, convert=True):
|
|
zones = ()
|
|
count = 0
|
|
with open(f"sql/SELECT/zones/paginateZonesBySku.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
with open(f"sql/SELECT/zones/paginateZonesBySkuCount.sql", "r+") as file:
|
|
sql_count = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
zones = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
zones = rows
|
|
|
|
cur.execute(sql_count, payload)
|
|
count = cur.fetchone()[0]
|
|
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return zones, count
|
|
|
|
class LocationsTable:
|
|
@dataclass
|
|
class Payload:
|
|
uuid: str
|
|
name: str
|
|
zone_id: int
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.uuid, str):
|
|
raise TypeError(f"uuid must be of type str; not {type(self.uuid)}")
|
|
if not isinstance(self.name, str):
|
|
raise TypeError(f"Location name must be of type str; not {type(self.name)}")
|
|
if not isinstance(self.zone_id, int):
|
|
raise TypeError(f"zone_id must be of type str; not {type(self.zone_id)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.uuid,
|
|
self.name,
|
|
self.zone_id
|
|
)
|
|
|
|
@classmethod
|
|
def paginateLocations(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = ()
|
|
count = 0
|
|
sql = f"SELECT * FROM {site}_locations LIMIT %s OFFSET %s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_locations;")
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def paginateLocationsWithZone(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = ()
|
|
count = 0
|
|
with open(f"sql/SELECT/getLocationsWithZone.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_locations;")
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def paginateLocationsBySkuZone(self, conn, site: str, payload: tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (item_id, zone_id, limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
locations = ()
|
|
count = 0
|
|
with open(f"sql/SELECT/locations/paginateLocationsBySkuZone.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
with open(f"sql/SELECT/locations/paginateLocationsBySkuZoneCount.sql", "r+") as file:
|
|
sql_count = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
locations = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
locations = rows
|
|
|
|
cur.execute(sql_count, payload)
|
|
count = cur.fetchone()[0]
|
|
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return locations, count
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
zone = ()
|
|
with open(f"sql/INSERT/insertLocationsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
zone = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
zone = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return zone
|
|
|
|
class VendorsTable:
|
|
@dataclass
|
|
class Payload:
|
|
vendor_name: str
|
|
created_by: int
|
|
vendor_address: str = ""
|
|
creation_date: datetime.datetime = field(init=False)
|
|
phone_number: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.vendor_name, str):
|
|
raise TypeError(f"vendor_name should be of type str; not {type(self.vendor_name)}")
|
|
self.creation_date = datetime.datetime.now()
|
|
|
|
|
|
def payload(self):
|
|
return (
|
|
self.vendor_name,
|
|
self.vendor_address,
|
|
self.creation_date,
|
|
self.created_by,
|
|
self.phone_number
|
|
)
|
|
|
|
@classmethod
|
|
def paginateVendors(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = ()
|
|
count = 0
|
|
sql = f"SELECT * FROM {site}_vendors LIMIT %s OFFSET %s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_vendors;")
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
zone = ()
|
|
with open(f"sql/INSERT/insertVendorsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
zone = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
zone = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return zone
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_vendors SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class BrandsTable:
|
|
@dataclass
|
|
class Payload:
|
|
name: str
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.name, str):
|
|
return TypeError(f"brand name should be of type str; not {type(self.name)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.name,
|
|
)
|
|
|
|
@classmethod
|
|
def paginateBrands(self, conn, site:str, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
site (str): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordset = ()
|
|
count = 0
|
|
sql = f"SELECT * FROM {site}_brands LIMIT %s OFFSET %s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(f"SELECT COUNT(*) FROM {site}_brands;")
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
brand = ()
|
|
with open(f"sql/INSERT/insertBrandsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
brand = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
brand = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return brand
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, site, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE {site}_brands SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class ItemLocationsTable:
|
|
@dataclass
|
|
class Payload:
|
|
part_id: int
|
|
location_id: int
|
|
quantity_on_hand: float = 0.0
|
|
cost_layers: list = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.part_id, int):
|
|
raise TypeError(f"part_id must be of type int; not {type(self.part_id)}")
|
|
if not isinstance(self.location_id, int):
|
|
raise TypeError(f"part_id must be of type int; not {type(self.part_id)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.part_id,
|
|
self.location_id,
|
|
self.quantity_on_hand,
|
|
lst2pgarr(self.cost_layers)
|
|
)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site, payload, convert=True):
|
|
"""insert payload into zones table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (name[str],)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
item_location = ()
|
|
with open(f"sql/INSERT/insertItemLocationsTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
item_location = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
item_location = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return item_location
|
|
|
|
@classmethod
|
|
def select_by_id(self, conn, site: str, payload: tuple, convert=True):
|
|
item_locations = ()
|
|
with open(f"sql/SELECT/selectItemLocations", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
item_locations = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
item_locations = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return item_locations
|
|
|
|
class ItemLinksTable:
|
|
@dataclass
|
|
class Payload:
|
|
barcode: str
|
|
link: int
|
|
data: dict = field(default_factory=dict)
|
|
conv_factor: float = 1
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.barcode, str):
|
|
raise TypeError(f"barcode must be of type str; not {type(self.barocde)}")
|
|
if not isinstance(self.link, int):
|
|
raise TypeError(f"link must be of type str; not {type(self.link)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.barcode,
|
|
self.link,
|
|
json.dumps(self.data),
|
|
self.conv_factor
|
|
)
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, site:str, payload:tuple, convert=True):
|
|
"""insert payload into itemlinks table of site
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
payload (tuple): (barcode[str], link[int], data[jsonb], conv_factor[float])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
link = ()
|
|
with open(f"sql/INSERT/insertItemLinksTuple.sql", "r+") as file:
|
|
sql = file.read().replace("%%site_name%%", site)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
link = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
link = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return link
|
|
|
|
class CycleCountsTable:
|
|
@dataclass
|
|
class Payload:
|
|
pass
|
|
|
|
class SitesTable:
|
|
|
|
@dataclass
|
|
class Manager:
|
|
site_name: str
|
|
admin_user: tuple
|
|
default_zone: int
|
|
default_location: int
|
|
description: str
|
|
create_order: list = field(init=False)
|
|
drop_order: list = field(init=False)
|
|
|
|
def __post_init__(self):
|
|
self.create_order = [
|
|
"logins",
|
|
"sites",
|
|
"roles",
|
|
"units",
|
|
"cost_layers",
|
|
"linked_items",
|
|
"brands",
|
|
"food_info",
|
|
"item_info",
|
|
"zones",
|
|
"locations",
|
|
"logistics_info",
|
|
"transactions",
|
|
"item",
|
|
"vendors",
|
|
"groups",
|
|
"group_items",
|
|
"receipts",
|
|
"receipt_items",
|
|
"recipes",
|
|
"recipe_items",
|
|
"shopping_lists",
|
|
"shopping_list_items",
|
|
"item_locations",
|
|
"conversions"
|
|
]
|
|
self.drop_order = [
|
|
"item_info",
|
|
"items",
|
|
"cost_layers",
|
|
"linked_items",
|
|
"transactions",
|
|
"brands",
|
|
"food_info",
|
|
"logistics_info",
|
|
"zones",
|
|
"locations",
|
|
"vendors",
|
|
"group_items",
|
|
"groups",
|
|
"receipt_items",
|
|
"receipts",
|
|
"recipe_items",
|
|
"recipes",
|
|
"shopping_list_items",
|
|
"shopping_lists",
|
|
"item_locations",
|
|
"conversions"
|
|
]
|
|
|
|
@dataclass
|
|
class Payload:
|
|
site_name: str
|
|
site_description: str
|
|
site_owner_id: int
|
|
default_zone: str = None
|
|
default_auto_issue_location: str = None
|
|
default_primary_location: str = None
|
|
creation_date: datetime.datetime = field(init=False)
|
|
flags: dict = field(default_factory=dict)
|
|
|
|
def __post_init__(self):
|
|
self.creation_date = datetime.datetime.now()
|
|
|
|
def payload(self):
|
|
return (
|
|
self.site_name,
|
|
self.site_description,
|
|
self.creation_date,
|
|
self.site_owner_id,
|
|
json.dumps(self.flags),
|
|
self.default_zone,
|
|
self.default_auto_issue_location,
|
|
self.default_primary_location
|
|
)
|
|
|
|
def get_dictionary(self):
|
|
return self.__dict__
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, payload:tuple, convert=True):
|
|
"""inserts payload into sites table
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (site_name[str], site_description[str], creation_date[timestamp], site_owner_id[int],
|
|
flags[dict], default_zone[str], default_auto_issue_location[str], default_primary_location[str])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
site_tuple = ()
|
|
with open(f"sql/INSERT/insertSitesTuple.sql", "r+") as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
site_tuple = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
site_tuple = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return site_tuple
|
|
|
|
|
|
@classmethod
|
|
def paginateTuples(self, conn, payload: tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): _description_. Defaults to True.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
recordsets = []
|
|
count = 0
|
|
sql = f"SELECT * FROM sites LIMIT %s OFFSET %s;"
|
|
sql_count = f"SELECT COUNT(*) FROM sites;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordsets = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordsets = rows
|
|
cur.execute(sql_count)
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordsets, count
|
|
|
|
@classmethod
|
|
def selectTuples(self, conn, convert=True):
|
|
recordsets = []
|
|
sql = f"SELECT * FROM sites"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordsets = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordsets = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return recordsets
|
|
|
|
@classmethod
|
|
def select_tuple(self, conn, payload:tuple, convert=True):
|
|
record = []
|
|
sql = f"SELECT * FROM sites WHERE id=%s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return record
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE sites SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class RolesTable:
|
|
@dataclass
|
|
class Payload:
|
|
role_name:str
|
|
role_description:str
|
|
site_id: int
|
|
flags: dict = field(default_factory=dict)
|
|
|
|
def payload(self):
|
|
return (
|
|
self.role_name,
|
|
self.role_description,
|
|
self.site_id,
|
|
json.dumps(self.flags)
|
|
)
|
|
|
|
def get_dictionary(self):
|
|
return self.__dict__
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, payload:tuple, convert=True):
|
|
"""inserts payload into roles table
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (role_name[str], role_description[str], site_id[int], flags[jsonb])
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
role_tuple = ()
|
|
with open(f"sql/INSERT/insertRolesTuple.sql", "r+") as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
role_tuple = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
role_tuple = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return role_tuple
|
|
|
|
@classmethod
|
|
def select_tuple(self, conn, payload:tuple, convert=True):
|
|
record = []
|
|
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id WHERE roles.id=%s;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
record = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
record = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, (), sql)
|
|
return record
|
|
|
|
@classmethod
|
|
def paginate_tuples(self, conn, payload:tuple, convert=True):
|
|
"""
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
recordset = []
|
|
|
|
sql = f"SELECT roles.*, row_to_json(sites.*) as site FROM roles LEFT JOIN sites ON sites.id = roles.site_id LIMIT %s OFFSET %s;"
|
|
sql_count = f"SELECT COUNT(*) FROM roles;"
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
|
|
cur.execute(sql_count)
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE roles SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
class LoginsTable:
|
|
@dataclass
|
|
class Payload:
|
|
username:str
|
|
password:str
|
|
email: str
|
|
row_type: str
|
|
system_admin: bool = False
|
|
flags: dict = field(default_factory=dict)
|
|
favorites: dict = field(default_factory=dict)
|
|
unseen_pantry_items: list = field(default_factory=list)
|
|
unseen_groups: list = field(default_factory=list)
|
|
unseen_shopping_lists: list = field(default_factory=list)
|
|
unseen_recipes: list = field(default_factory=list)
|
|
seen_pantry_items: list = field(default_factory=list)
|
|
seen_groups: list = field(default_factory=list)
|
|
seen_shopping_lists: list = field(default_factory=list)
|
|
seen_recipes: list = field(default_factory=list)
|
|
sites: list = field(default_factory=list)
|
|
site_roles: list = field(default_factory=list)
|
|
|
|
def payload(self):
|
|
return (
|
|
self.username,
|
|
self.password,
|
|
self.email,
|
|
json.dumps(self.favorites),
|
|
lst2pgarr(self.unseen_pantry_items),
|
|
lst2pgarr(self.unseen_groups),
|
|
lst2pgarr(self.unseen_shopping_lists),
|
|
lst2pgarr(self.unseen_recipes),
|
|
lst2pgarr(self.seen_pantry_items),
|
|
lst2pgarr(self.seen_groups),
|
|
lst2pgarr(self.seen_shopping_lists),
|
|
lst2pgarr(self.seen_recipes),
|
|
lst2pgarr(self.sites),
|
|
lst2pgarr(self.site_roles),
|
|
self.system_admin,
|
|
json.dumps(self.flags),
|
|
self.row_type
|
|
)
|
|
|
|
def get_dictionary(self):
|
|
return self.__dict__
|
|
|
|
|
|
@classmethod
|
|
def insert_tuple(self, conn, payload:tuple, convert=True):
|
|
"""inserts payload into roles table
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (username, password, email, favorites, unseen_pantry_items, unseen_groups, unseen_shopping_lists,
|
|
unseen_recipes, seen_pantry_items, seen_groups, seen_shopping_lists, seen_recipes,
|
|
sites, site_roles, system_admin, flags, row_type)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
login = ()
|
|
with open(f"sql/INSERT/insertLoginsTupleTwo.sql", "r+") as file:
|
|
sql = file.read()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
login = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
login = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return login
|
|
|
|
|
|
@classmethod
|
|
def select_tuple(self, conn, payload:tuple, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_type_): _description_
|
|
payload (tuple): (user_id,)
|
|
convert (bool, optional): _description_. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError: _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
user = ()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
sql = f"SELECT * FROM logins WHERE id=%s;"
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
user = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
user = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return user
|
|
|
|
@classmethod
|
|
def paginate_tuples(self, conn, payload:tuple, convert=True):
|
|
"""
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
payload (tuple): (limit, offset)
|
|
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: inserted tuple
|
|
"""
|
|
recordset = []
|
|
|
|
sql = f"SELECT * FROM logins LIMIT %s OFFSET %s;"
|
|
sql_count = f"SELECT COUNT(*) FROM logins;"
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchall()
|
|
if rows and convert:
|
|
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
|
|
elif rows and not convert:
|
|
recordset = rows
|
|
|
|
cur.execute(sql_count)
|
|
count = cur.fetchone()[0]
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return recordset, count
|
|
|
|
@classmethod
|
|
def get_washed_tuple(self, conn, payload:tuple, convert=True):
|
|
user = ()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
sql = f"SELECT id, username, sites, site_roles, system_admin, flags FROM logins WHERE id=%s;"
|
|
cur.execute(sql, payload)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
user = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
user = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return user
|
|
|
|
@classmethod
|
|
def update_tuple(self, conn, payload, convert=True):
|
|
"""_summary_
|
|
|
|
Args:
|
|
conn (_T_connector@connect): Postgresql Connector
|
|
site (str):
|
|
table (str):
|
|
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
|
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
|
|
|
Raises:
|
|
DatabaseError:
|
|
|
|
Returns:
|
|
tuple or dict: updated tuple
|
|
"""
|
|
updated = ()
|
|
|
|
set_clause, values = updateStringFactory(payload['update'])
|
|
values.append(payload['id'])
|
|
sql = f"UPDATE logins SET {set_clause} WHERE id=%s RETURNING *;"
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchone()
|
|
if rows and convert:
|
|
updated = tupleDictionaryFactory(cur.description, rows)
|
|
elif rows and not convert:
|
|
updated = rows
|
|
except Exception as error:
|
|
raise DatabaseError(error, payload, sql)
|
|
return updated
|
|
|
|
@dataclass
|
|
class TransactionPayload:
|
|
timestamp: datetime.datetime
|
|
logistics_info_id: int
|
|
barcode: str
|
|
name: str
|
|
transaction_type: str
|
|
quantity: float
|
|
description: str
|
|
user_id: int
|
|
data: dict = field(default_factory=dict)
|
|
|
|
def payload(self):
|
|
return (
|
|
self.timestamp,
|
|
self.logistics_info_id,
|
|
self.barcode,
|
|
self.name,
|
|
self.transaction_type,
|
|
self.quantity,
|
|
self.description,
|
|
self.user_id,
|
|
json.dumps(self.data)
|
|
)
|
|
|
|
@dataclass
|
|
class CostLayerPayload:
|
|
aquisition_date: datetime.datetime
|
|
quantity: float
|
|
cost: float
|
|
currency_type: str
|
|
vendor: int = 0
|
|
expires: datetime.datetime = None
|
|
|
|
def payload(self):
|
|
return (
|
|
self.aquisition_date,
|
|
self.quantity,
|
|
self.cost,
|
|
self.currency_type,
|
|
self.expires,
|
|
self.vendor
|
|
)
|
|
|
|
@dataclass
|
|
class ItemLinkPayload:
|
|
barcode: str
|
|
link: int
|
|
data: dict = field(default_factory=dict)
|
|
conv_factor: float = 1
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.barcode, str):
|
|
raise TypeError(f"barcode must be of type str; not {type(self.barocde)}")
|
|
if not isinstance(self.link, int):
|
|
raise TypeError(f"link must be of type str; not {type(self.link)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.barcode,
|
|
self.link,
|
|
json.dumps(self.data),
|
|
self.conv_factor
|
|
)
|
|
|
|
@dataclass
|
|
class LogisticsInfoPayload:
|
|
barcode: str
|
|
primary_location: int
|
|
primary_zone: int
|
|
auto_issue_location: int
|
|
auto_issue_zone: int
|
|
|
|
def payload(self):
|
|
return (self.barcode,
|
|
self.primary_location,
|
|
self.primary_zone,
|
|
self.auto_issue_location,
|
|
self.auto_issue_zone)
|
|
|
|
@dataclass
|
|
class ItemInfoPayload:
|
|
barcode: str
|
|
packaging: str = ""
|
|
uom_quantity: float = 1.0
|
|
uom: int = 1
|
|
cost: float = 0.0
|
|
safety_stock: float = 0.0
|
|
lead_time_days: float = 0.0
|
|
ai_pick: bool = False
|
|
prefixes: list = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.barcode, str):
|
|
raise TypeError(f"barcode must be of type str; not {type(self.barcode)}")
|
|
|
|
def payload(self):
|
|
return (
|
|
self.barcode,
|
|
self.packaging,
|
|
self.uom_quantity,
|
|
self.uom,
|
|
self.cost,
|
|
self.safety_stock,
|
|
self.lead_time_days,
|
|
self.ai_pick,
|
|
lst2pgarr(self.prefixes)
|
|
)
|
|
|
|
@dataclass
|
|
class FoodInfoPayload:
|
|
food_groups: list = field(default_factory=list)
|
|
ingrediants: list = field(default_factory=list)
|
|
nutrients: dict = field(default_factory=dict)
|
|
expires: bool = False
|
|
default_expiration: float = 0.0
|
|
|
|
def payload(self):
|
|
return (
|
|
lst2pgarr(self.food_groups),
|
|
lst2pgarr(self.ingrediants),
|
|
json.dumps(self.nutrients),
|
|
self.expires,
|
|
self.default_expiration
|
|
) |