Compare commits

..

2 Commits

Author SHA1 Message Date
Jadowyne Ulve
a279e2a95a test 2025-04-30 17:35:35 -05:00
Jadowyne Ulve
e2eea3ca84 items_API.additemlink updated to new schema 2025-04-30 17:35:02 -05:00
18 changed files with 606 additions and 68 deletions

View File

@ -96,7 +96,7 @@ class ItemsPayload:
self.search_string self.search_string
) )
# done
@dataclass @dataclass
class TransactionPayload: class TransactionPayload:
timestamp: datetime.datetime timestamp: datetime.datetime

Binary file not shown.

View File

@ -165,6 +165,23 @@ def getItemLink(site: str, payload:tuple, convert:bool=True):
except Exception as error: except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql) raise postsqldb.DatabaseError(error, payload, sql)
def getLocation(site:str, payload:tuple, convert:bool=True):
selected = ()
database_config = config.config()
sql = f"SELECT * FROM {site}_locations WHERE id=%s;"
try:
with psycopg2.connect(**database_config) as conn:
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
selected = rows
return selected
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def paginateZonesBySku(site: str, payload: tuple, convert=True): def paginateZonesBySku(site: str, payload: tuple, convert=True):
database_config = config.config() database_config = config.config()
zones, count = (), 0 zones, count = (), 0
@ -403,3 +420,330 @@ def postUpdateItemLink(site: str, payload: dict):
postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}}) postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}})
postAddTransaction(conn, site, transaction.payload()) postAddTransaction(conn, site, transaction.payload())
def postUpdateCostLayer(site, payload, convert=True, conn=None):
"""_summary_
Args:
conn (_T_connector@connect): Postgresql Connector
site (str):
table (str):
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
Raises:
DatabaseError:
Returns:
tuple or dict: updated tuple
"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_cost_layers SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertCostLayersTuple(site, payload, convert=True, conn=None):
cost_layer = ()
self_conn = False
with open(f"application/items/sql/insertCostLayersTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
cost_layer = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
cost_layer = rows
if self_conn:
conn.commit()
conn.close()
return cost_layer
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def insertItemLocationsTuple(conn, site, payload, convert=True):
location = ()
database_config = config.config()
with open(f"application/items/sql/insertItemLocationsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
conn = psycopg2.connect(**database_config)
conn.autocommit = False
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
location = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
location = rows
return location, conn
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def selectItemLocationsTuple(site_name, payload, convert=True):
"""select a single tuple from ItemLocations table for site_name
Args:
conn (_T_connector@connect):
site_name (str):
payload (tuple): [item_id, location_id]
convert (bool): defaults to False, used to determine return of tuple/dict
Returns:
tuple: the row that was returned from the table
"""
item_locations = ()
database_config = config.config()
select_item_location_sql = f"SELECT * FROM {site_name}_item_locations WHERE part_id = %s AND location_id = %s;"
try:
with psycopg2.connect(**database_config) as conn:
with conn.cursor() as cur:
cur.execute(select_item_location_sql, payload)
rows = cur.fetchone()
if rows and convert:
item_locations = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
item_locations = rows
return item_locations
except Exception as error:
return error
def selectCostLayersTuple(site_name, payload, convert=True):
"""select a single or series of cost layers from the database for site_name
Args:
conn (_T_connector@connect):
site_name (str):
payload (tuple): (item_locations_id, )
convert (bool): defaults to False, used for determining return as tuple/dict
Returns:
list: list of tuples/dict from the cost_layers table for site_name
"""
cost_layers = ()
database_config = config.config()
select_cost_layers_sql = f"SELECT cl.* FROM {site_name}_item_locations il JOIN {site_name}_cost_layers cl ON cl.id = ANY(il.cost_layers) where il.id=%s;"
try:
with psycopg2.connect(**database_config) as conn:
with conn.cursor() as cur:
cur.execute(select_cost_layers_sql, payload)
rows = cur.fetchall()
if rows and convert:
cost_layers = rows
cost_layers = [postsqldb.tupleDictionaryFactory(cur.description, layer) for layer in rows]
elif rows and not convert:
cost_layers = rows
return cost_layers
except Exception as error:
return error
def postDeleteCostLayer(site_name, payload, convert=True, conn=None):
"""
payload (tuple): (table_to_delete_from, tuple_id)
Raises:
DatabaseError:
Returns:
tuple or dict: deleted tuple
"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_cost_layers WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def postUpdateItemLocation(site, payload, conn=None):
item_location = ()
self_conn = False
with open(f"sql/updateItemLocation.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows:
item_location = rows
if self_conn:
conn.commit()
conn.close()
return item_location
except Exception as error:
return error
def postAddTransaction(site, payload, convert=False, conn=None):
transaction = ()
self_conn = False
with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
transaction = rows
if self_conn:
conn.commit()
conn.close()
return transaction
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def postInsertItemLink(site, payload, convert=True, conn=None):
"""insert payload into itemlinks table of site
Args:
conn (_T_connector@connect): Postgresql Connector
site (str):
payload (tuple): (barcode[str], link[int], data[jsonb], conv_factor[float])
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
Raises:
DatabaseError:
Returns:
tuple or dict: inserted tuple
"""
link = ()
self_conn = False
with open(f"application/items/sql/insertItemLinksTuple.sql", "r+") as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
link = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
link = rows
if self_conn:
conn.commit()
conn.close()
return link, conn
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)
def postUpdateItemByID(site, payload, convert=True, conn=None):
""" high level update of an item specific data, none of its relationships
Args:
conn (_T_connector@connect): Postgresql Connector
site (str):
table (str):
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
Raises:
DatabaseError:
Returns:
tuple or dict: updated tuple
"""
updated = ()
self_conn = False
set_clause, values = postsqldb.updateStringFactory(payload['update'])
values.append(payload['id'])
sql = f"UPDATE {site}_items SET {set_clause} WHERE id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated, conn
except Exception as error:
raise postsqldb.DatabaseError(error, payload, sql)

View File

@ -5,6 +5,7 @@ from main import unfoldCostLayers
from user_api import login_required from user_api import login_required
import application.postsqldb as db import application.postsqldb as db
from application.items import database_items from application.items import database_items
from application.items import items_processes
items_api = Blueprint('items_api', __name__) items_api = Blueprint('items_api', __name__)
@ -486,76 +487,49 @@ def getLinkedItem():
return jsonify({'linked_item': linked_item, 'error': True, 'message': f'method {request.method} not allowed'}) return jsonify({'linked_item': linked_item, 'error': True, 'message': f'method {request.method} not allowed'})
@items_api.route('/item/addLinkedItem', methods=["POST"]) @items_api.route('/item/addLinkedItem', methods=["POST"])
@login_required
def addLinkedItem(): def addLinkedItem():
""" POST a link between items by passing a parent_id, a child_id, conv_factor
---
parameters:
- in: query
name: parent_id
schema:
type: integer
default: 1
required: true
description: id to linked list item
- in: query
name: child_id
schema:
type: integer
default: 1
required: true
description: id to item to be linked to list.
- in: query
name: conv_factor
schema:
type: integer
default: 1
required: true
description: integer factor between child id to parent id.
responses:
200:
description: Items linked successfully.
"""
if request.method == "POST": if request.method == "POST":
parent_id = request.get_json()['parent_id'] parent_id = request.get_json()['parent_id']
child_id = request.get_json()['child_id'] child_id = request.get_json()['child_id']
conv_factor = request.get_json()['conv_factor'] conv_factor = request.get_json()['conv_factor']
database_config = config()
site_name = session['selected_site'] site_name = session['selected_site']
user_id = session['user_id'] user_id = session['user_id']
with psycopg2.connect(**database_config) as conn:
print(parent_id, child_id, conv_factor)
parent_item = database.getItemAllByID(conn, site_name, (parent_id, ), convert=True)
child_item = database.getItemAllByID(conn, site_name, (child_id, ), convert=True)
# i need to transact out ALL locations for child item. items_processes.postLinkedItem(site_name, {
pprint.pprint(child_item) 'parent_id': parent_id,
sum_child_qoh = 0 'child_id': child_id,
for location in child_item['item_locations']: 'user_id': user_id,
print(location) 'conv_factor': conv_factor
sum_child_qoh += location['quantity_on_hand'] })
payload = {
'item_id': child_item['id'],
'logistics_info_id': child_item['logistics_info_id'],
'barcode': child_item['barcode'],
'item_name': child_item['item_name'],
'transaction_type': 'Adjust Out',
'quantity': location['quantity_on_hand'],
'description': f'Converted to {parent_item['barcode']}',
'cost': child_item['item_info']['cost'],
'vendor': 1,
'expires': False,
'location_id': location['location_id']
}
process.postTransaction(conn, site_name, user_id, payload)
print(sum_child_qoh)
primary_location = database.selectItemLocationsTuple(conn, site_name, (parent_item['id'], parent_item['logistics_info']['primary_location']['id']), convert=True)
payload = {
'item_id': parent_item['id'],
'logistics_info_id': parent_item['logistics_info_id'],
'barcode': parent_item['barcode'],
'item_name': parent_item['item_name'],
'transaction_type': 'Adjust In',
'quantity': (float(sum_child_qoh)*float(conv_factor)),
'description': f'Converted from {child_item['barcode']}',
'cost': child_item['item_info']['cost'],
'vendor': 1,
'expires': None,
'location_id': primary_location['location_id']
}
pprint.pprint(payload)
result = process.postTransaction(conn, site_name, user_id, payload)
if result['error']:
return jsonify(result)
itemLink = MyDataclasses.ItemLinkPayload(
barcode=child_item['barcode'],
link=parent_item['id'],
data=child_item,
conv_factor=conv_factor
)
database.insertItemLinksTuple(conn, site_name, itemLink.payload())
database.__updateTuple(conn, site_name, f"{site_name}_items", {'id': child_item['id'], 'update': {'row_type': 'link'}})
return jsonify({'error': False, 'message': 'Linked Item added!!'}) return jsonify({'error': False, 'message': 'Linked Item added!!'})
return jsonify({'error': True, 'message': 'These was an error with adding to the linked list!'}) return jsonify({'error': True, 'message': 'These was an error with adding to the linked list!'})

View File

@ -0,0 +1,169 @@
from application.items import database_items
import application.postsqldb as db
import config
import datetime
import psycopg2
def postLinkedItem(site, payload):
"""
payload = {parent_id, child_id, user_id, conv_factor}
"""
parent_item = database_items.getItemAllByID(site, (payload['parent_id'],))
child_item = database_items.getItemAllByID(site, (payload['child_id'],))
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
# i need to transact out ALL locations for child item.
sum_child_qoh = 0
for location in child_item['item_locations']:
sum_child_qoh += location['quantity_on_hand']
adjustment_payload = {
'item_id': child_item['id'],
'logistics_info_id': child_item['logistics_info_id'],
'barcode': child_item['barcode'],
'item_name': child_item['item_name'],
'transaction_type': 'Adjust Out',
'quantity': location['quantity_on_hand'],
'description': f'Converted to {parent_item['barcode']}',
'cost': child_item['item_info']['cost'],
'vendor': 1,
'expires': False,
'location_id': location['location_id']
}
print(conn)
conn = postAdjustment(site, payload['user_id'], adjustment_payload, conn=conn)
print(conn)
#process.postTransaction(conn, site_name, user_id, payload)
print(sum_child_qoh)
primary_location = database_items.selectItemLocationsTuple(site, (parent_item['id'], parent_item['logistics_info']['primary_location']['id']), convert=True)
print(primary_location)
adjustment_payload = {
'item_id': parent_item['id'],
'logistics_info_id': parent_item['logistics_info_id'],
'barcode': parent_item['barcode'],
'item_name': parent_item['item_name'],
'transaction_type': 'Adjust In',
'quantity': (float(sum_child_qoh)*float(payload['conv_factor'])),
'description': f'Converted from {child_item['barcode']}',
'cost': child_item['item_info']['cost'],
'vendor': 1,
'expires': None,
'location_id': primary_location['location_id']
}
print(conn)
conn=postAdjustment(site, payload['user_id'], adjustment_payload, conn=conn)
print(conn)
itemLink = db.ItemLinkPayload(
barcode=child_item['barcode'],
link=parent_item['id'],
data=child_item,
conv_factor=payload['conv_factor']
)
_, conn = database_items.postInsertItemLink(site, itemLink.payload(), conn=conn)
print(conn)
print(_['id'])
_, conn = database_items.postUpdateItemByID(site, {'id': child_item['id'], 'update': {'row_type': 'link'}}, conn=conn)
print(conn)
print(_['id'])
conn.commit()
conn.close()
def postAdjustment(site_name, user_id, data: dict, conn=None):
"""dict_keys(['item_id', 'logistics_info_id', 'barcode', 'item_name', 'transaction_type',
'quantity', 'description', 'cost', 'vendor', 'expires', 'location_id'])"""
def quantityFactory(quantity_on_hand:float, quantity:float, transaction_type:str):
if transaction_type == "Adjust In":
quantity_on_hand += quantity
return quantity_on_hand
if transaction_type == "Adjust Out":
quantity_on_hand -= quantity
return quantity_on_hand
raise Exception("The transaction type is wrong!")
transaction_time = datetime.datetime.now()
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
cost_layer = db.CostLayerPayload(
aquisition_date=transaction_time,
quantity=float(data['quantity']),
cost=float(data['cost']),
currency_type="USD",
vendor=int(data['vendor']),
expires=data['expires']
)
transaction = db.TransactionPayload(
timestamp=transaction_time,
logistics_info_id=int(data['logistics_info_id']),
barcode=data['barcode'],
name=data['item_name'],
transaction_type=data['transaction_type'],
quantity=float(data['quantity']),
description=data['description'],
user_id=user_id,
)
location = database_items.selectItemLocationsTuple(site_name, payload=(data['item_id'], data['location_id']))
cost_layers: list = location['cost_layers']
if data['transaction_type'] == "Adjust In":
cost_layer = database_items.insertCostLayersTuple(site_name, cost_layer.payload(), conn=conn)
cost_layers.append(cost_layer['id'])
if data['transaction_type'] == "Adjust Out":
if float(location['quantity_on_hand']) < float(data['quantity']):
pass
else:
cost_layers = database_items.selectCostLayersTuple(site_name, (location['id'], ))
new_cost_layers = []
qty = float(data['quantity'])
for layer in cost_layers:
if qty == 0.0:
new_cost_layers.append(layer['id'])
elif qty >= float(layer['quantity']):
qty -= float(layer['quantity'])
layer['quantity'] = 0.0
else:
layer['quantity'] -= qty
new_cost_layers.append(layer['id'])
database_items.postUpdateCostLayer(site_name, {'id': layer['id'], 'update': {'quantity': layer['quantity']}}, conn=conn)
qty = 0.0
if layer['quantity'] == 0.0:
database_items.postDeleteCostLayer(site_name, (layer['id'], ), conn=conn)
cost_layers = new_cost_layers
quantity_on_hand = quantityFactory(float(location['quantity_on_hand']), data['quantity'], data['transaction_type'])
updated_item_location_payload = (cost_layers, quantity_on_hand, data['item_id'], data['location_id'])
database_items.postUpdateItemLocation(site_name, updated_item_location_payload)
site_location = database_items.getLocation(site_name, (location['location_id'], ))
transaction.data = {'location': site_location['uuid']}
database_items.postAddTransaction(site_name, transaction.payload(), conn=conn)
if self_conn:
conn.commit()
conn.close()
return False
return conn

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_cost_layers
(aquisition_date, quantity, cost, currency_type, expires, vendor)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_itemlinks
(barcode, link, data, conv_factor)
VALUES (%s, %s, %s, %s)
RETURNING *;

View File

@ -0,0 +1,4 @@
INSERT INTO %%site_name%%_item_locations
(part_id, location_id, quantity_on_hand, cost_layers)
VALUES (%s, %s, %s, %s)
RETURNING *;

View File

@ -2370,3 +2370,42 @@ class TransactionPayload:
json.dumps(self.data) json.dumps(self.data)
) )
@dataclass
class CostLayerPayload:
aquisition_date: datetime.datetime
quantity: float
cost: float
currency_type: str
vendor: int = 0
expires: datetime.datetime = None
def payload(self):
return (
self.aquisition_date,
self.quantity,
self.cost,
self.currency_type,
self.expires,
self.vendor
)
@dataclass
class ItemLinkPayload:
barcode: str
link: int
data: dict = field(default_factory=dict)
conv_factor: float = 1
def __post_init__(self):
if not isinstance(self.barcode, str):
raise TypeError(f"barcode must be of type str; not {type(self.barocde)}")
if not isinstance(self.link, int):
raise TypeError(f"link must be of type str; not {type(self.link)}")
def payload(self):
return (
self.barcode,
self.link,
json.dumps(self.data),
self.conv_factor
)

View File

@ -227,7 +227,7 @@ def postTransaction(conn, site_name, user_id, data: dict):
transaction_time = datetime.datetime.now() transaction_time = datetime.datetime.now()
cost_layer = MyDataclasses.CostLayerPayload( cost_layer = postsqldb.CostLayerPayload(
aquisition_date=transaction_time, aquisition_date=transaction_time,
quantity=float(data['quantity']), quantity=float(data['quantity']),
cost=float(data['cost']), cost=float(data['cost']),
@ -235,7 +235,7 @@ def postTransaction(conn, site_name, user_id, data: dict):
vendor=int(data['vendor']), vendor=int(data['vendor']),
expires=data['expires'] expires=data['expires']
) )
transaction = MyDataclasses.TransactionPayload( transaction = postsqldb.TransactionPayload(
timestamp=transaction_time, timestamp=transaction_time,
logistics_info_id=int(data['logistics_info_id']), logistics_info_id=int(data['logistics_info_id']),
barcode=data['barcode'], barcode=data['barcode'],

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.2 KiB