Compare commits
No commits in common. "a279e2a95ac21ac6ea22cd23f1edd3083c0c1b6c" and "3c30adcd499532310ce5c6065bc59b15fd718c89" have entirely different histories.
a279e2a95a
...
3c30adcd49
@ -96,7 +96,7 @@ class ItemsPayload:
|
||||
self.search_string
|
||||
)
|
||||
|
||||
# done
|
||||
|
||||
@dataclass
|
||||
class TransactionPayload:
|
||||
timestamp: datetime.datetime
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -165,23 +165,6 @@ def getItemLink(site: str, payload:tuple, convert:bool=True):
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def getLocation(site:str, payload:tuple, convert:bool=True):
|
||||
selected = ()
|
||||
database_config = config.config()
|
||||
sql = f"SELECT * FROM {site}_locations WHERE id=%s;"
|
||||
try:
|
||||
with psycopg2.connect(**database_config) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
selected = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
selected = rows
|
||||
return selected
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def paginateZonesBySku(site: str, payload: tuple, convert=True):
|
||||
database_config = config.config()
|
||||
zones, count = (), 0
|
||||
@ -420,330 +403,3 @@ def postUpdateItemLink(site: str, payload: dict):
|
||||
postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}})
|
||||
postAddTransaction(conn, site, transaction.payload())
|
||||
|
||||
def postUpdateCostLayer(site, payload, convert=True, conn=None):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
conn (_T_connector@connect): Postgresql Connector
|
||||
site (str):
|
||||
table (str):
|
||||
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
||||
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
||||
|
||||
Raises:
|
||||
DatabaseError:
|
||||
|
||||
Returns:
|
||||
tuple or dict: updated tuple
|
||||
"""
|
||||
updated = ()
|
||||
self_conn = False
|
||||
|
||||
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
||||
values.append(payload['id'])
|
||||
sql = f"UPDATE {site}_cost_layers SET {set_clause} WHERE id=%s RETURNING *;"
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, values)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
updated = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return updated
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def insertCostLayersTuple(site, payload, convert=True, conn=None):
|
||||
cost_layer = ()
|
||||
self_conn = False
|
||||
|
||||
with open(f"application/items/sql/insertCostLayersTuple.sql", "r+") as file:
|
||||
sql = file.read().replace("%%site_name%%", site)
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = True
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
cost_layer = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
cost_layer = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return cost_layer
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def insertItemLocationsTuple(conn, site, payload, convert=True):
|
||||
location = ()
|
||||
database_config = config.config()
|
||||
with open(f"application/items/sql/insertItemLocationsTuple.sql", "r+") as file:
|
||||
sql = file.read().replace("%%site_name%%", site)
|
||||
try:
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
location = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
location = rows
|
||||
return location, conn
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def selectItemLocationsTuple(site_name, payload, convert=True):
|
||||
"""select a single tuple from ItemLocations table for site_name
|
||||
|
||||
Args:
|
||||
conn (_T_connector@connect):
|
||||
site_name (str):
|
||||
payload (tuple): [item_id, location_id]
|
||||
convert (bool): defaults to False, used to determine return of tuple/dict
|
||||
|
||||
Returns:
|
||||
tuple: the row that was returned from the table
|
||||
"""
|
||||
item_locations = ()
|
||||
database_config = config.config()
|
||||
select_item_location_sql = f"SELECT * FROM {site_name}_item_locations WHERE part_id = %s AND location_id = %s;"
|
||||
try:
|
||||
with psycopg2.connect(**database_config) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(select_item_location_sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
item_locations = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
item_locations = rows
|
||||
return item_locations
|
||||
except Exception as error:
|
||||
return error
|
||||
|
||||
def selectCostLayersTuple(site_name, payload, convert=True):
|
||||
"""select a single or series of cost layers from the database for site_name
|
||||
|
||||
Args:
|
||||
conn (_T_connector@connect):
|
||||
site_name (str):
|
||||
payload (tuple): (item_locations_id, )
|
||||
convert (bool): defaults to False, used for determining return as tuple/dict
|
||||
|
||||
Returns:
|
||||
list: list of tuples/dict from the cost_layers table for site_name
|
||||
"""
|
||||
cost_layers = ()
|
||||
database_config = config.config()
|
||||
select_cost_layers_sql = f"SELECT cl.* FROM {site_name}_item_locations il JOIN {site_name}_cost_layers cl ON cl.id = ANY(il.cost_layers) where il.id=%s;"
|
||||
try:
|
||||
with psycopg2.connect(**database_config) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(select_cost_layers_sql, payload)
|
||||
rows = cur.fetchall()
|
||||
if rows and convert:
|
||||
cost_layers = rows
|
||||
cost_layers = [postsqldb.tupleDictionaryFactory(cur.description, layer) for layer in rows]
|
||||
elif rows and not convert:
|
||||
cost_layers = rows
|
||||
return cost_layers
|
||||
except Exception as error:
|
||||
return error
|
||||
|
||||
def postDeleteCostLayer(site_name, payload, convert=True, conn=None):
|
||||
"""
|
||||
payload (tuple): (table_to_delete_from, tuple_id)
|
||||
Raises:
|
||||
DatabaseError:
|
||||
|
||||
Returns:
|
||||
tuple or dict: deleted tuple
|
||||
"""
|
||||
deleted = ()
|
||||
self_conn = False
|
||||
sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_cost_layers WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
|
||||
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchall()
|
||||
if rows and convert:
|
||||
deleted = [postsqldb.tupleDictionaryFactory(cur.description, r) for r in rows]
|
||||
elif rows and not convert:
|
||||
deleted = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return deleted
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def postUpdateItemLocation(site, payload, conn=None):
|
||||
|
||||
item_location = ()
|
||||
self_conn = False
|
||||
with open(f"sql/updateItemLocation.sql", "r+") as file:
|
||||
sql = file.read().replace("%%site_name%%", site)
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows:
|
||||
item_location = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return item_location
|
||||
except Exception as error:
|
||||
return error
|
||||
|
||||
def postAddTransaction(site, payload, convert=False, conn=None):
|
||||
transaction = ()
|
||||
self_conn = False
|
||||
|
||||
with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file:
|
||||
sql = file.read().replace("%%site_name%%", site)
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
transaction = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
transaction = rows
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return transaction
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
|
||||
def postInsertItemLink(site, payload, convert=True, conn=None):
|
||||
"""insert payload into itemlinks table of site
|
||||
|
||||
Args:
|
||||
conn (_T_connector@connect): Postgresql Connector
|
||||
site (str):
|
||||
payload (tuple): (barcode[str], link[int], data[jsonb], conv_factor[float])
|
||||
convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False.
|
||||
|
||||
Raises:
|
||||
DatabaseError:
|
||||
|
||||
Returns:
|
||||
tuple or dict: inserted tuple
|
||||
"""
|
||||
link = ()
|
||||
self_conn = False
|
||||
|
||||
with open(f"application/items/sql/insertItemLinksTuple.sql", "r+") as file:
|
||||
sql = file.read().replace("%%site_name%%", site)
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, payload)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
link = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
link = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return link, conn
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
def postUpdateItemByID(site, payload, convert=True, conn=None):
|
||||
""" high level update of an item specific data, none of its relationships
|
||||
|
||||
Args:
|
||||
conn (_T_connector@connect): Postgresql Connector
|
||||
site (str):
|
||||
table (str):
|
||||
payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}}
|
||||
convert (bool, optional): determines if to return tuple as dictionary. Defaults to False.
|
||||
|
||||
Raises:
|
||||
DatabaseError:
|
||||
|
||||
Returns:
|
||||
tuple or dict: updated tuple
|
||||
"""
|
||||
updated = ()
|
||||
self_conn = False
|
||||
set_clause, values = postsqldb.updateStringFactory(payload['update'])
|
||||
values.append(payload['id'])
|
||||
sql = f"UPDATE {site}_items SET {set_clause} WHERE id=%s RETURNING *;"
|
||||
try:
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, values)
|
||||
rows = cur.fetchone()
|
||||
if rows and convert:
|
||||
updated = postsqldb.tupleDictionaryFactory(cur.description, rows)
|
||||
elif rows and not convert:
|
||||
updated = rows
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return updated, conn
|
||||
except Exception as error:
|
||||
raise postsqldb.DatabaseError(error, payload, sql)
|
||||
|
||||
@ -5,7 +5,6 @@ from main import unfoldCostLayers
|
||||
from user_api import login_required
|
||||
import application.postsqldb as db
|
||||
from application.items import database_items
|
||||
from application.items import items_processes
|
||||
|
||||
items_api = Blueprint('items_api', __name__)
|
||||
|
||||
@ -487,49 +486,76 @@ def getLinkedItem():
|
||||
return jsonify({'linked_item': linked_item, 'error': True, 'message': f'method {request.method} not allowed'})
|
||||
|
||||
@items_api.route('/item/addLinkedItem', methods=["POST"])
|
||||
@login_required
|
||||
def addLinkedItem():
|
||||
""" POST a link between items by passing a parent_id, a child_id, conv_factor
|
||||
---
|
||||
parameters:
|
||||
- in: query
|
||||
name: parent_id
|
||||
schema:
|
||||
type: integer
|
||||
default: 1
|
||||
required: true
|
||||
description: id to linked list item
|
||||
- in: query
|
||||
name: child_id
|
||||
schema:
|
||||
type: integer
|
||||
default: 1
|
||||
required: true
|
||||
description: id to item to be linked to list.
|
||||
- in: query
|
||||
name: conv_factor
|
||||
schema:
|
||||
type: integer
|
||||
default: 1
|
||||
required: true
|
||||
description: integer factor between child id to parent id.
|
||||
responses:
|
||||
200:
|
||||
description: Items linked successfully.
|
||||
"""
|
||||
if request.method == "POST":
|
||||
parent_id = request.get_json()['parent_id']
|
||||
child_id = request.get_json()['child_id']
|
||||
conv_factor = request.get_json()['conv_factor']
|
||||
|
||||
database_config = config()
|
||||
site_name = session['selected_site']
|
||||
user_id = session['user_id']
|
||||
with psycopg2.connect(**database_config) as conn:
|
||||
print(parent_id, child_id, conv_factor)
|
||||
parent_item = database.getItemAllByID(conn, site_name, (parent_id, ), convert=True)
|
||||
child_item = database.getItemAllByID(conn, site_name, (child_id, ), convert=True)
|
||||
|
||||
items_processes.postLinkedItem(site_name, {
|
||||
'parent_id': parent_id,
|
||||
'child_id': child_id,
|
||||
'user_id': user_id,
|
||||
'conv_factor': conv_factor
|
||||
})
|
||||
# i need to transact out ALL locations for child item.
|
||||
pprint.pprint(child_item)
|
||||
sum_child_qoh = 0
|
||||
for location in child_item['item_locations']:
|
||||
print(location)
|
||||
sum_child_qoh += location['quantity_on_hand']
|
||||
payload = {
|
||||
'item_id': child_item['id'],
|
||||
'logistics_info_id': child_item['logistics_info_id'],
|
||||
'barcode': child_item['barcode'],
|
||||
'item_name': child_item['item_name'],
|
||||
'transaction_type': 'Adjust Out',
|
||||
'quantity': location['quantity_on_hand'],
|
||||
'description': f'Converted to {parent_item['barcode']}',
|
||||
'cost': child_item['item_info']['cost'],
|
||||
'vendor': 1,
|
||||
'expires': False,
|
||||
'location_id': location['location_id']
|
||||
}
|
||||
process.postTransaction(conn, site_name, user_id, payload)
|
||||
|
||||
print(sum_child_qoh)
|
||||
|
||||
primary_location = database.selectItemLocationsTuple(conn, site_name, (parent_item['id'], parent_item['logistics_info']['primary_location']['id']), convert=True)
|
||||
|
||||
|
||||
payload = {
|
||||
'item_id': parent_item['id'],
|
||||
'logistics_info_id': parent_item['logistics_info_id'],
|
||||
'barcode': parent_item['barcode'],
|
||||
'item_name': parent_item['item_name'],
|
||||
'transaction_type': 'Adjust In',
|
||||
'quantity': (float(sum_child_qoh)*float(conv_factor)),
|
||||
'description': f'Converted from {child_item['barcode']}',
|
||||
'cost': child_item['item_info']['cost'],
|
||||
'vendor': 1,
|
||||
'expires': None,
|
||||
'location_id': primary_location['location_id']
|
||||
}
|
||||
|
||||
pprint.pprint(payload)
|
||||
result = process.postTransaction(conn, site_name, user_id, payload)
|
||||
|
||||
if result['error']:
|
||||
return jsonify(result)
|
||||
|
||||
itemLink = MyDataclasses.ItemLinkPayload(
|
||||
barcode=child_item['barcode'],
|
||||
link=parent_item['id'],
|
||||
data=child_item,
|
||||
conv_factor=conv_factor
|
||||
)
|
||||
|
||||
database.insertItemLinksTuple(conn, site_name, itemLink.payload())
|
||||
|
||||
database.__updateTuple(conn, site_name, f"{site_name}_items", {'id': child_item['id'], 'update': {'row_type': 'link'}})
|
||||
|
||||
return jsonify({'error': False, 'message': 'Linked Item added!!'})
|
||||
return jsonify({'error': True, 'message': 'These was an error with adding to the linked list!'})
|
||||
|
||||
@ -1,169 +0,0 @@
|
||||
from application.items import database_items
|
||||
import application.postsqldb as db
|
||||
import config
|
||||
|
||||
import datetime
|
||||
import psycopg2
|
||||
|
||||
def postLinkedItem(site, payload):
|
||||
"""
|
||||
payload = {parent_id, child_id, user_id, conv_factor}
|
||||
"""
|
||||
parent_item = database_items.getItemAllByID(site, (payload['parent_id'],))
|
||||
child_item = database_items.getItemAllByID(site, (payload['child_id'],))
|
||||
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
|
||||
# i need to transact out ALL locations for child item.
|
||||
sum_child_qoh = 0
|
||||
for location in child_item['item_locations']:
|
||||
sum_child_qoh += location['quantity_on_hand']
|
||||
adjustment_payload = {
|
||||
'item_id': child_item['id'],
|
||||
'logistics_info_id': child_item['logistics_info_id'],
|
||||
'barcode': child_item['barcode'],
|
||||
'item_name': child_item['item_name'],
|
||||
'transaction_type': 'Adjust Out',
|
||||
'quantity': location['quantity_on_hand'],
|
||||
'description': f'Converted to {parent_item['barcode']}',
|
||||
'cost': child_item['item_info']['cost'],
|
||||
'vendor': 1,
|
||||
'expires': False,
|
||||
'location_id': location['location_id']
|
||||
}
|
||||
|
||||
print(conn)
|
||||
conn = postAdjustment(site, payload['user_id'], adjustment_payload, conn=conn)
|
||||
print(conn)
|
||||
#process.postTransaction(conn, site_name, user_id, payload)
|
||||
|
||||
print(sum_child_qoh)
|
||||
|
||||
primary_location = database_items.selectItemLocationsTuple(site, (parent_item['id'], parent_item['logistics_info']['primary_location']['id']), convert=True)
|
||||
|
||||
print(primary_location)
|
||||
|
||||
adjustment_payload = {
|
||||
'item_id': parent_item['id'],
|
||||
'logistics_info_id': parent_item['logistics_info_id'],
|
||||
'barcode': parent_item['barcode'],
|
||||
'item_name': parent_item['item_name'],
|
||||
'transaction_type': 'Adjust In',
|
||||
'quantity': (float(sum_child_qoh)*float(payload['conv_factor'])),
|
||||
'description': f'Converted from {child_item['barcode']}',
|
||||
'cost': child_item['item_info']['cost'],
|
||||
'vendor': 1,
|
||||
'expires': None,
|
||||
'location_id': primary_location['location_id']
|
||||
}
|
||||
print(conn)
|
||||
conn=postAdjustment(site, payload['user_id'], adjustment_payload, conn=conn)
|
||||
print(conn)
|
||||
itemLink = db.ItemLinkPayload(
|
||||
barcode=child_item['barcode'],
|
||||
link=parent_item['id'],
|
||||
data=child_item,
|
||||
conv_factor=payload['conv_factor']
|
||||
)
|
||||
|
||||
_, conn = database_items.postInsertItemLink(site, itemLink.payload(), conn=conn)
|
||||
print(conn)
|
||||
print(_['id'])
|
||||
_, conn = database_items.postUpdateItemByID(site, {'id': child_item['id'], 'update': {'row_type': 'link'}}, conn=conn)
|
||||
print(conn)
|
||||
print(_['id'])
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def postAdjustment(site_name, user_id, data: dict, conn=None):
|
||||
"""dict_keys(['item_id', 'logistics_info_id', 'barcode', 'item_name', 'transaction_type',
|
||||
'quantity', 'description', 'cost', 'vendor', 'expires', 'location_id'])"""
|
||||
def quantityFactory(quantity_on_hand:float, quantity:float, transaction_type:str):
|
||||
if transaction_type == "Adjust In":
|
||||
quantity_on_hand += quantity
|
||||
return quantity_on_hand
|
||||
if transaction_type == "Adjust Out":
|
||||
quantity_on_hand -= quantity
|
||||
return quantity_on_hand
|
||||
raise Exception("The transaction type is wrong!")
|
||||
|
||||
transaction_time = datetime.datetime.now()
|
||||
|
||||
self_conn = False
|
||||
|
||||
if not conn:
|
||||
database_config = config.config()
|
||||
conn = psycopg2.connect(**database_config)
|
||||
conn.autocommit = False
|
||||
self_conn = True
|
||||
|
||||
cost_layer = db.CostLayerPayload(
|
||||
aquisition_date=transaction_time,
|
||||
quantity=float(data['quantity']),
|
||||
cost=float(data['cost']),
|
||||
currency_type="USD",
|
||||
vendor=int(data['vendor']),
|
||||
expires=data['expires']
|
||||
)
|
||||
|
||||
transaction = db.TransactionPayload(
|
||||
timestamp=transaction_time,
|
||||
logistics_info_id=int(data['logistics_info_id']),
|
||||
barcode=data['barcode'],
|
||||
name=data['item_name'],
|
||||
transaction_type=data['transaction_type'],
|
||||
quantity=float(data['quantity']),
|
||||
description=data['description'],
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
location = database_items.selectItemLocationsTuple(site_name, payload=(data['item_id'], data['location_id']))
|
||||
cost_layers: list = location['cost_layers']
|
||||
if data['transaction_type'] == "Adjust In":
|
||||
cost_layer = database_items.insertCostLayersTuple(site_name, cost_layer.payload(), conn=conn)
|
||||
cost_layers.append(cost_layer['id'])
|
||||
|
||||
if data['transaction_type'] == "Adjust Out":
|
||||
if float(location['quantity_on_hand']) < float(data['quantity']):
|
||||
pass
|
||||
else:
|
||||
cost_layers = database_items.selectCostLayersTuple(site_name, (location['id'], ))
|
||||
|
||||
new_cost_layers = []
|
||||
qty = float(data['quantity'])
|
||||
for layer in cost_layers:
|
||||
if qty == 0.0:
|
||||
new_cost_layers.append(layer['id'])
|
||||
elif qty >= float(layer['quantity']):
|
||||
qty -= float(layer['quantity'])
|
||||
layer['quantity'] = 0.0
|
||||
else:
|
||||
layer['quantity'] -= qty
|
||||
new_cost_layers.append(layer['id'])
|
||||
database_items.postUpdateCostLayer(site_name, {'id': layer['id'], 'update': {'quantity': layer['quantity']}}, conn=conn)
|
||||
qty = 0.0
|
||||
|
||||
if layer['quantity'] == 0.0:
|
||||
database_items.postDeleteCostLayer(site_name, (layer['id'], ), conn=conn)
|
||||
|
||||
cost_layers = new_cost_layers
|
||||
|
||||
quantity_on_hand = quantityFactory(float(location['quantity_on_hand']), data['quantity'], data['transaction_type'])
|
||||
|
||||
updated_item_location_payload = (cost_layers, quantity_on_hand, data['item_id'], data['location_id'])
|
||||
database_items.postUpdateItemLocation(site_name, updated_item_location_payload)
|
||||
|
||||
site_location = database_items.getLocation(site_name, (location['location_id'], ))
|
||||
|
||||
transaction.data = {'location': site_location['uuid']}
|
||||
|
||||
database_items.postAddTransaction(site_name, transaction.payload(), conn=conn)
|
||||
|
||||
if self_conn:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return False
|
||||
|
||||
return conn
|
||||
@ -1,4 +0,0 @@
|
||||
INSERT INTO %%site_name%%_cost_layers
|
||||
(aquisition_date, quantity, cost, currency_type, expires, vendor)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
RETURNING *;
|
||||
@ -1,4 +0,0 @@
|
||||
INSERT INTO %%site_name%%_itemlinks
|
||||
(barcode, link, data, conv_factor)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
RETURNING *;
|
||||
@ -1,4 +0,0 @@
|
||||
INSERT INTO %%site_name%%_item_locations
|
||||
(part_id, location_id, quantity_on_hand, cost_layers)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
RETURNING *;
|
||||
@ -2370,42 +2370,3 @@ class TransactionPayload:
|
||||
json.dumps(self.data)
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class CostLayerPayload:
|
||||
aquisition_date: datetime.datetime
|
||||
quantity: float
|
||||
cost: float
|
||||
currency_type: str
|
||||
vendor: int = 0
|
||||
expires: datetime.datetime = None
|
||||
|
||||
def payload(self):
|
||||
return (
|
||||
self.aquisition_date,
|
||||
self.quantity,
|
||||
self.cost,
|
||||
self.currency_type,
|
||||
self.expires,
|
||||
self.vendor
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class ItemLinkPayload:
|
||||
barcode: str
|
||||
link: int
|
||||
data: dict = field(default_factory=dict)
|
||||
conv_factor: float = 1
|
||||
|
||||
def __post_init__(self):
|
||||
if not isinstance(self.barcode, str):
|
||||
raise TypeError(f"barcode must be of type str; not {type(self.barocde)}")
|
||||
if not isinstance(self.link, int):
|
||||
raise TypeError(f"link must be of type str; not {type(self.link)}")
|
||||
|
||||
def payload(self):
|
||||
return (
|
||||
self.barcode,
|
||||
self.link,
|
||||
json.dumps(self.data),
|
||||
self.conv_factor
|
||||
)
|
||||
|
||||
@ -227,7 +227,7 @@ def postTransaction(conn, site_name, user_id, data: dict):
|
||||
|
||||
transaction_time = datetime.datetime.now()
|
||||
|
||||
cost_layer = postsqldb.CostLayerPayload(
|
||||
cost_layer = MyDataclasses.CostLayerPayload(
|
||||
aquisition_date=transaction_time,
|
||||
quantity=float(data['quantity']),
|
||||
cost=float(data['cost']),
|
||||
@ -235,7 +235,7 @@ def postTransaction(conn, site_name, user_id, data: dict):
|
||||
vendor=int(data['vendor']),
|
||||
expires=data['expires']
|
||||
)
|
||||
transaction = postsqldb.TransactionPayload(
|
||||
transaction = MyDataclasses.TransactionPayload(
|
||||
timestamp=transaction_time,
|
||||
logistics_info_id=int(data['logistics_info_id']),
|
||||
barcode=data['barcode'],
|
||||
|
||||
Binary file not shown.
Binary file not shown.
|
Before Width: | Height: | Size: 9.2 KiB |
Loading…
x
Reference in New Issue
Block a user