From db661111e0997c9bc5e30b626e9ecc461bd81d7e Mon Sep 17 00:00:00 2001 From: Jadowyne Ulve Date: Sat, 31 May 2025 21:19:35 -0500 Subject: [PATCH] Migrated deleteConversion to new api schema --- application/items/database_items.py | 443 +++++++++++++++------------- application/items/items_API.py | 22 +- 2 files changed, 259 insertions(+), 206 deletions(-) diff --git a/application/items/database_items.py b/application/items/database_items.py index 1eb854d..3e0808f 100644 --- a/application/items/database_items.py +++ b/application/items/database_items.py @@ -378,206 +378,6 @@ def paginateBrands(site:str, payload:tuple, convert:bool=True): except Exception as error: raise postsqldb.DatabaseError(error, payload, sql) -def postUpdateItem(site:str, payload:dict): - """ POST and update to an item - - Args: - site (str): name of the site the item exists in. - payload (dict): STRICT FORMAT - {id: item_id, data: SEE BELOW, user_id: updater} - - data is complex structure - top level keys should be a combo of: ['item', 'item_info', 'logistics_info', 'food_info'] - with in each of these top levels there are key value pairs in this format - {'column_name': 'new_value'} - """ - def postUpdateData(conn, table, payload, convert=True): - updated = () - - set_clause, values = postsqldb.updateStringFactory(payload['update']) - values.append(payload['id']) - sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;" - try: - with conn.cursor() as cur: - cur.execute(sql, values) - rows = cur.fetchone() - if rows and convert: - updated = postsqldb.tupleDictionaryFactory(cur.description, rows) - elif rows and not convert: - updated = rows - except Exception as error: - raise postsqldb.DatabaseError(error, payload, sql) - return updated - - def postAddTransaction(conn, site, payload, convert=False): - transaction = () - with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file: - sql = file.read().replace("%%site_name%%", site) - try: - with conn.cursor() as cur: - cur.execute(sql, payload) - rows = cur.fetchone() - if rows and convert: - transaction = postsqldb.tupleDictionaryFactory(cur.description, rows) - elif rows and not convert: - transaction = rows - except Exception as error: - raise postsqldb.DatabaseError(error, payload, sql) - return transaction - - transaction_data = {} - database_config = config.config() - data = payload['update'] - for key in data.keys(): - for key_2 in data[key].keys(): - transaction_data[f"{key_2}_new"] = data[key][key_2] - try: - with psycopg2.connect(**database_config) as conn: - item = getItemAllByID(site, (payload['id'], )) - if 'item_info' in data.keys() and data['item_info'] != {}: - for key in data['item_info'].keys(): - transaction_data[f"{key}_old"] = item['item_info'][key] - postUpdateData(conn, f"{site}_item_info", {'id': item['item_info_id'], 'update': data['item_info']}) - - if 'food_info' in data.keys() and data['food_info'] != {}: - for key in data['food_info'].keys(): - transaction_data[f"{key}_old"] = item['food_info'][key] - postUpdateData(conn, f"{site}_food_info", {'id': item['food_info_id'], 'update': data['food_info']}) - - if 'logistics_info' in data.keys() and data['logistics_info'] != {}: - for key in data['logistics_info'].keys(): - transaction_data[f"{key}_old"] = item['logistics_info'][key] - postUpdateData(conn, f"{site}_logistics_info", {'id': item['logistics_info_id'], 'update': data['logistics_info']}) - - if 'item' in data.keys() and data['item'] != {}: - for key in data['item'].keys(): - if key == "brand": - transaction_data[f"{key}_old"] = item['brand']['id'] - else: - transaction_data[f"{key}_old"] = item[key] - postUpdateData(conn, f"{site}_items", {'id': payload['id'], 'update': data['item']}) - - trans = postsqldb.TransactionPayload( - timestamp=datetime.datetime.now(), - logistics_info_id=item['logistics_info_id'], - barcode=item['barcode'], - name=item['item_name'], - transaction_type="UPDATE", - quantity=0.0, - description="Item was updated!", - user_id=payload['user_id'], - data=transaction_data - ) - postAddTransaction(conn, site, trans.payload()) - except Exception as error: - raise postsqldb.DatabaseError(error, payload, "MULTICALL!") - -def postUpdateItemLink(site: str, payload: dict): - """ POST update to ItemLink - - Args: - site (str): _description_ - payload (dict): {id, update, old_conv_factor, user_id} - """ - def postUpdateData(conn, table, payload, convert=True): - updated = () - set_clause, values = postsqldb.updateStringFactory(payload['update']) - values.append(payload['id']) - sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;" - try: - with conn.cursor() as cur: - cur.execute(sql, values) - rows = cur.fetchone() - if rows and convert: - updated = postsqldb.tupleDictionaryFactory(cur.description, rows) - elif rows and not convert: - updated = rows - except Exception as error: - raise postsqldb.DatabaseError(error, payload, sql) - return updated - - def postAddTransaction(conn, site, payload, convert=False): - transaction = () - with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file: - sql = file.read().replace("%%site_name%%", site) - try: - with conn.cursor() as cur: - cur.execute(sql, payload) - rows = cur.fetchone() - if rows and convert: - transaction = postsqldb.tupleDictionaryFactory(cur.description, rows) - elif rows and not convert: - transaction = rows - except Exception as error: - raise postsqldb.DatabaseError(error, payload, sql) - return transaction - - database_config = config.config() - transaction_time = datetime.datetime.now() - barcode = payload['barcode'] - with psycopg2.connect(**database_config) as conn: - linkedItem = getItemAllByBarcode(site, (barcode, )) - - transaction = postsqldb.TransactionPayload( - timestamp=transaction_time, - logistics_info_id=linkedItem['logistics_info_id'], - barcode=barcode, - name=linkedItem['item_name'], - transaction_type='UPDATE', - quantity=0.0, - description='Link updated!', - user_id=payload['user_id'], - data={'new_conv_factor': payload['update']['conv_factor'], 'old_conv_factor': payload['old_conv_factor']} - ) - - postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}}) - postAddTransaction(conn, site, transaction.payload()) - -def postUpdateCostLayer(site, payload, convert=True, conn=None): - """_summary_ - - Args: - conn (_T_connector@connect): Postgresql Connector - site (str): - table (str): - payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} - convert (bool, optional): determines if to return tuple as dictionary. Defaults to False. - - Raises: - DatabaseError: - - Returns: - tuple or dict: updated tuple - """ - updated = () - self_conn = False - - set_clause, values = postsqldb.updateStringFactory(payload['update']) - values.append(payload['id']) - sql = f"UPDATE {site}_cost_layers SET {set_clause} WHERE id=%s RETURNING *;" - try: - if not conn: - database_config = config.config() - conn = psycopg2.connect(**database_config) - conn.autocommit = False - self_conn = True - - with conn.cursor() as cur: - cur.execute(sql, values) - rows = cur.fetchone() - if rows and convert: - updated = postsqldb.tupleDictionaryFactory(cur.description, rows) - elif rows and not convert: - updated = rows - - if self_conn: - conn.commit() - conn.close() - - return updated - except Exception as error: - raise postsqldb.DatabaseError(error, payload, sql) - def insertCostLayersTuple(site, payload, convert=True, conn=None): cost_layer = () self_conn = False @@ -944,6 +744,49 @@ def postDeleteCostLayer(site_name, payload, convert=True, conn=None): except Exception as error: raise postsqldb.DatabaseError(error, payload, sql) +def deleteConversionTuple(site_name: str, payload: tuple, convert=True, conn=None): + """This is a basic funtion to delete a tuple from a table in site with an id. All + tables in this database has id's associated with them. + + Args: + conn (_T_connector@connect): Postgresql Connector + site_name (str): + payload (tuple): (tuple_id,...) + convert (bool, optional): Determines if to return tuple as dictionary. Defaults to False. + + Raises: + DatabaseError: + + Returns: + tuple or dict: deleted tuple + """ + deleted = () + self_conn = False + sql = f"WITH deleted_rows AS (DELETE FROM {site_name}_conversions WHERE id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;" + try: + if not conn: + database_config = config.config() + conn = psycopg2.connect(**database_config) + conn.autocommit = False + self_conn = True + + with conn.cursor() as cur: + cur.execute(sql, payload) + rows = cur.fetchone() + if rows and convert: + deleted = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + deleted = rows + + if self_conn: + conn.commit() + conn.close() + + return deleted + + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + def postUpdateItemLocation(site, payload, conn=None): item_location = () @@ -970,7 +813,207 @@ def postUpdateItemLocation(site, payload, conn=None): return item_location except Exception as error: return error + +def postUpdateItem(site:str, payload:dict): + """ POST and update to an item + + Args: + site (str): name of the site the item exists in. + payload (dict): STRICT FORMAT + {id: item_id, data: SEE BELOW, user_id: updater} + + data is complex structure + top level keys should be a combo of: ['item', 'item_info', 'logistics_info', 'food_info'] + with in each of these top levels there are key value pairs in this format + {'column_name': 'new_value'} + """ + def postUpdateData(conn, table, payload, convert=True): + updated = () + + set_clause, values = postsqldb.updateStringFactory(payload['update']) + values.append(payload['id']) + sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;" + try: + with conn.cursor() as cur: + cur.execute(sql, values) + rows = cur.fetchone() + if rows and convert: + updated = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + updated = rows + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + return updated + def postAddTransaction(conn, site, payload, convert=False): + transaction = () + with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file: + sql = file.read().replace("%%site_name%%", site) + try: + with conn.cursor() as cur: + cur.execute(sql, payload) + rows = cur.fetchone() + if rows and convert: + transaction = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + transaction = rows + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + return transaction + + transaction_data = {} + database_config = config.config() + data = payload['update'] + for key in data.keys(): + for key_2 in data[key].keys(): + transaction_data[f"{key_2}_new"] = data[key][key_2] + try: + with psycopg2.connect(**database_config) as conn: + item = getItemAllByID(site, (payload['id'], )) + if 'item_info' in data.keys() and data['item_info'] != {}: + for key in data['item_info'].keys(): + transaction_data[f"{key}_old"] = item['item_info'][key] + postUpdateData(conn, f"{site}_item_info", {'id': item['item_info_id'], 'update': data['item_info']}) + + if 'food_info' in data.keys() and data['food_info'] != {}: + for key in data['food_info'].keys(): + transaction_data[f"{key}_old"] = item['food_info'][key] + postUpdateData(conn, f"{site}_food_info", {'id': item['food_info_id'], 'update': data['food_info']}) + + if 'logistics_info' in data.keys() and data['logistics_info'] != {}: + for key in data['logistics_info'].keys(): + transaction_data[f"{key}_old"] = item['logistics_info'][key] + postUpdateData(conn, f"{site}_logistics_info", {'id': item['logistics_info_id'], 'update': data['logistics_info']}) + + if 'item' in data.keys() and data['item'] != {}: + for key in data['item'].keys(): + if key == "brand": + transaction_data[f"{key}_old"] = item['brand']['id'] + else: + transaction_data[f"{key}_old"] = item[key] + postUpdateData(conn, f"{site}_items", {'id': payload['id'], 'update': data['item']}) + + trans = postsqldb.TransactionPayload( + timestamp=datetime.datetime.now(), + logistics_info_id=item['logistics_info_id'], + barcode=item['barcode'], + name=item['item_name'], + transaction_type="UPDATE", + quantity=0.0, + description="Item was updated!", + user_id=payload['user_id'], + data=transaction_data + ) + postAddTransaction(conn, site, trans.payload()) + except Exception as error: + raise postsqldb.DatabaseError(error, payload, "MULTICALL!") + +def postUpdateItemLink(site: str, payload: dict): + """ POST update to ItemLink + + Args: + site (str): _description_ + payload (dict): {id, update, old_conv_factor, user_id} + """ + def postUpdateData(conn, table, payload, convert=True): + updated = () + set_clause, values = postsqldb.updateStringFactory(payload['update']) + values.append(payload['id']) + sql = f"UPDATE {table} SET {set_clause} WHERE id=%s RETURNING *;" + try: + with conn.cursor() as cur: + cur.execute(sql, values) + rows = cur.fetchone() + if rows and convert: + updated = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + updated = rows + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + return updated + + def postAddTransaction(conn, site, payload, convert=False): + transaction = () + with open(f"application/items/sql/insertTransactionsTuple.sql", "r+") as file: + sql = file.read().replace("%%site_name%%", site) + try: + with conn.cursor() as cur: + cur.execute(sql, payload) + rows = cur.fetchone() + if rows and convert: + transaction = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + transaction = rows + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + return transaction + + database_config = config.config() + transaction_time = datetime.datetime.now() + barcode = payload['barcode'] + with psycopg2.connect(**database_config) as conn: + linkedItem = getItemAllByBarcode(site, (barcode, )) + + transaction = postsqldb.TransactionPayload( + timestamp=transaction_time, + logistics_info_id=linkedItem['logistics_info_id'], + barcode=barcode, + name=linkedItem['item_name'], + transaction_type='UPDATE', + quantity=0.0, + description='Link updated!', + user_id=payload['user_id'], + data={'new_conv_factor': payload['update']['conv_factor'], 'old_conv_factor': payload['old_conv_factor']} + ) + + postUpdateData(conn, f"{site}_itemlinks", {'id': payload['id'], 'update': {'conv_factor': payload['update']['conv_factor']}}) + postAddTransaction(conn, site, transaction.payload()) + +def postUpdateCostLayer(site, payload, convert=True, conn=None): + """_summary_ + + Args: + conn (_T_connector@connect): Postgresql Connector + site (str): + table (str): + payload (dict): {'id': row_id, 'update': {... column_to_update: value_to_update_to...}} + convert (bool, optional): determines if to return tuple as dictionary. Defaults to False. + + Raises: + DatabaseError: + + Returns: + tuple or dict: updated tuple + """ + updated = () + self_conn = False + + set_clause, values = postsqldb.updateStringFactory(payload['update']) + values.append(payload['id']) + sql = f"UPDATE {site}_cost_layers SET {set_clause} WHERE id=%s RETURNING *;" + try: + if not conn: + database_config = config.config() + conn = psycopg2.connect(**database_config) + conn.autocommit = False + self_conn = True + + with conn.cursor() as cur: + cur.execute(sql, values) + rows = cur.fetchone() + if rows and convert: + updated = postsqldb.tupleDictionaryFactory(cur.description, rows) + elif rows and not convert: + updated = rows + + if self_conn: + conn.commit() + conn.close() + + return updated + except Exception as error: + raise postsqldb.DatabaseError(error, payload, sql) + def postAddTransaction(site, payload, convert=False, conn=None): transaction = () self_conn = False diff --git a/application/items/items_API.py b/application/items/items_API.py index 213d985..585c37d 100644 --- a/application/items/items_API.py +++ b/application/items/items_API.py @@ -732,15 +732,25 @@ def addConversion(): @items_api.route('/deleteConversion', methods=['POST']) def deleteConversion(): + """ POST delete conversion to the system given a conversion_id + --- + parameters: + - in: header + name: conversion_id + schema: + type: integer + default: 1 + required: true + description: conversion_id to be deleted + responses: + 200: + description: Prefix added successfully. + """ if request.method == "POST": conversion_id = request.get_json()['conversion_id'] - print(conversion_id) - database_config = config() site_name = session['selected_site'] - with psycopg2.connect(**database_config) as conn: - db.ConversionsTable.delete_item_tuple(conn, site_name, (conversion_id,)) - - return jsonify(error=False, message="Conversion was deleted successfully") + database_items.deleteConversionTuple(site_name, (conversion_id,)) + return jsonify(error=False, message="Conversion was deleted successfully") return jsonify(error=True, message="Unable to delete this conversion, ERROR!") @items_api.route('/updateConversion', methods=['POST'])