Compare commits

...

8 Commits
main ... dev

Author SHA1 Message Date
Mechseroms
9b7ba8d8da setup for linux 2025-10-12 17:09:28 -05:00
Jadowyne Ulve
d261076fd4 Moving PCS 2025-10-08 10:18:31 -05:00
Jadowyne Ulve
cbfe74c868 more database migration 2025-09-30 15:07:59 -05:00
Jadowyne Ulve
a412a02652 continued migration 0909202591813 2025-09-09 09:17:48 -05:00
Jadowyne Ulve
f0f85cbfcf Merge remote-tracking branch 'origin/main' into dev 2025-09-07 16:45:53 -05:00
Jadowyne Ulve
0b255d1b91 rebasing 2025-09-07 16:40:30 -05:00
Jadowyne Ulve
a3cad9622e 2nd Database oop migration 2025-08-28 17:58:05 -05:00
Jadowyne Ulve
5b5b914b35 testing dev branch 2025-08-23 15:25:56 -05:00
258 changed files with 4113 additions and 2252 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ test.py
.VScodeCounter
celerybeat-schedule
instance/application.cfg.py
docs

View File

@ -15,3 +15,5 @@ You then edit and receive that receipt into the system.
There is also the ability to use a kiosk like interface to set up a scan in and out system, where as you use things
the system will remove those items by scanning them.
test

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,27 +1,20 @@
from flask import Blueprint, request, render_template, redirect, session, url_for, jsonify
from authlib.integrations.flask_client import OAuth
import hashlib, psycopg2
import hashlib
from config import config, sites_config
from functools import wraps
import postsqldb
import requests
from application.access_module import access_database
from application.database_postgres.UsersModel import UsersModel
from outh import oauth
access_api = Blueprint('access_api', __name__, template_folder="templates", static_folder="static")
def update_session_user():
user = access_database.selectLoginsTupleByID((session['user_id'],))
user = access_database.washUserDictionary(user)
user = UsersModel.select_tuple(session['selected_site'], {'key': session['user_uuid']})
user = UsersModel.washUserDictionary(user)
session['user'] = user
print(user)
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
@ -89,20 +82,13 @@ def login():
password = hashlib.sha256(password.encode()).hexdigest()
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM logins WHERE username=%s;"
cur.execute(sql, (username,))
user = cur.fetchone()
except (Exception, psycopg2.DatabaseError) as error:
conn.rollback()
return jsonify({'error': True, 'message': str(error)})
if user and user[2] == password:
session['user_id'] = user[0]
session['user'] = {'id': user[0], 'username': user[1], 'sites': user[13], 'site_roles': user[14], 'system_admin': user[15], 'flags': user[16]}
session['login_type'] = 'Internal'
user = UsersModel.select_tuple_by_username({'key': username})
if user and user['user_password'] == password:
session['user_uuid'] = user['user_uuid']
session['user'] = UsersModel.washUserDictionary(user)
session['user_login_type'] = 'Internal'
return jsonify({'error': False, 'message': 'Logged In Sucessfully!'})
else:
return jsonify({'error': True, 'message': 'Username or Password was incorrect!'})
@ -111,16 +97,8 @@ def login():
if 'user' not in session.keys():
session['user'] = None
print(instance_config)
return render_template("login.html", instance_settings=instance_config)
@access_api.route('/dashboard')
def dashboard():
if 'user' not in session:
return redirect('/')
return f"Hello, {session['user']['name']}! <a href='/logout'>Logout</a>"
@access_api.route('/signup', methods=['POST', 'GET'])
def signup():
instance_config = sites_config()
@ -132,14 +110,14 @@ def signup():
password = request.get_json()['password']
email = request.get_json()['email']
password = hashlib.sha256(password.encode()).hexdigest()
database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"INSERT INTO logins(username, password, email, row_type) VALUES(%s, %s, %s, %s);"
cur.execute(sql, (username, password, email, 'user'))
except (Exception, psycopg2.DatabaseError) as error:
conn.rollback()
return jsonify({'error': True, 'message': str(error)})
new_user = UsersModel.Payload(
user_name=username,
user_password=password,
user_email=email
)
new_user = UsersModel.insert_tuple('', new_user.payload_dictionary())
return jsonify({'error': False, 'message': 'You have been signed up successfully, you will have to wait until the server admin finishes your onboarding!'})
return jsonify({'error': True, 'message': 'There was a problem with this POST request!'})

View File

@ -7,7 +7,7 @@ import hashlib
# APPLICATION IMPORTS
from application.access_module import access_api
from application.administration import administration_database, administration_processes
from application.administration import administration_database, administration_services
from application import database_payloads, postsqldb
@ -69,7 +69,7 @@ def first_time_setup():
"site_description": request.form['site_description']
}
administration_processes.addSite(payload)
administration_services.addSite(payload)
return redirect("/login")
@ -127,7 +127,7 @@ def postDeleteSite():
return jsonify({'error': True, 'message': f"You must be the owner of this site to delete."})
try:
administration_processes.deleteSite(site, user)
administration_services.deleteSite(site, user)
except Exception as err:
print(err)
@ -144,7 +144,7 @@ def postAddSite():
user = administration_database.selectLoginsTuple((user_id,))
payload['admin_user'] = (user['username'], user['password'], user['email'], user['row_type'])
administration_processes.addSite(payload)
administration_services.addSite(payload)
return jsonify({'error': False, 'message': f"Zone added to {site_name}."})

View File

@ -0,0 +1,148 @@
from application.database_postgres.UsersModel import UsersModel
from application.database_postgres.RolesModel import RolesModel
from application.database_postgres.BaseModel import DatabaseError, tupleDictionaryFactory
import config
import psycopg2
class ExtendedRolesModel(RolesModel):
@classmethod
def select_by_site_uuid(self, payload, convert=True, conn=None):
roles = ()
self_conn = False
select_roles_sql = f"SELECT * FROM roles WHERE role_site_uuid = %(site_uuid)s::uuid;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_roles_sql, payload)
rows = cur.fetchall()
if rows and convert:
roles = [tupleDictionaryFactory(cur.description, role) for role in rows]
elif rows and not convert:
roles = rows
if self_conn:
conn.close()
return roles
except Exception as error:
raise DatabaseError(error, payload, select_roles_sql)
class ExtendedUsersModel(UsersModel):
@classmethod
def add_admin_user(self, payload:dict, convert=True, conn=None):
admin_user = ()
self_conn = False
sql = f"""INSERT INTO users (user_name, user_password, user_email, user_row_type)
VALUES (%(user_name)s, %(user_password)s, %(user_email)s, %(user_row_type)s) ON CONFLICT (user_name) DO UPDATE SET user_name = excluded.user_name RETURNING *;"""
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
admin_user = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
admin_user = rows
if self_conn:
conn.commit()
conn.close()
return admin_user
except Exception as e:
DatabaseError(str(e), payload, sql)
@classmethod
def update_roles(self, payload, convert=True, conn=None):
""" payload: {'role_uuid': x,} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT users.user_uuid FROM users WHERE user_roles @> ARRAY[%(role_uuid)s::uuid];"
with conn.cursor() as cur:
cur.execute(select_sql, payload)
users = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE users SET user_roles = array_remove(user_roles, %(role_uuid)s::uuid) WHERE user_uuid = %(user_uuid)s::uuid;"
with conn.cursor() as cur:
for user_uuid in users:
cur.execute(update_sql, {'role_uuid': payload['role_uuid'], 'user_uuid': user_uuid})
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
@classmethod
def update_sites(self, payload, convert=True, conn=None):
""" payload: {'site_uuid',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT users.user_uuid FROM users WHERE user_sites @> ARRAY[%(site_uuid)s::uuid];"
with conn.cursor() as cur:
cur.execute(select_sql, payload)
user = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE users SET user_sites = array_remove(user_sites, %(site_uuid)s::uuid) WHERE user_uuid = %(user_uuid)s::uuid;"
with conn.cursor() as cur:
for user_uuid in user:
cur.execute(update_sql, {'site_uuid': payload['site_uuid'], 'user_uuid': user_uuid})
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
@classmethod
def update_user_site_roles(self, payload, convert=True, conn=None):
""" payload (tuple): (site_uuid, role_uuid, user_uuid) """
sql = f"UPDATE users SET user_sites = user_sites || %(site_uuid)s::uuid, user_roles = user_roles || %(role_uuid)s::uuid WHERE user_uuid=%(user_uuid)s RETURNING *;"
user = ()
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.commit()
conn.close()
return user
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -1,167 +0,0 @@
# 3RD PARTY IMPORTS
import psycopg2
import datetime
# APPLICATION IMPORTS
import config
from application import postsqldb, database_payloads
from application.administration import administration_database
def dropSiteTables(conn, site_manager):
try:
for table in site_manager.drop_order:
administration_database.dropTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} DROPPED!\n")
except Exception as error:
raise error
def deleteSite(site, user, conn=None):
"""Uses a Site Manager to delete a site from the system.
Args:
site_manager (MyDataclasses.SiteManager):
Raises:
Exception:
"""
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
try:
admin_user = (user['username'], user['password'], user['email'], user['row_type'])
site_manager = database_payloads.SiteManager(
site['site_name'],
admin_user,
site['default_zone'],
site['default_primary_location'],
site['site_description']
)
roles = administration_database.selectRolesTupleBySite((site['id'],), conn=conn)
administration_database.deleteRolesTuple([role['id'] for role in roles], conn=conn)
dropSiteTables(conn, site_manager)
for role in roles:
administration_database.updateUsersRoles({'role_id': role['id']}, conn=conn)
administration_database.updateUsersSites({'site_id': site['id']}, conn=conn)
site = administration_database.deleteSitesTuple((site['id'], ), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
conn.close()
def addAdminUser(conn, site_manager, convert=True):
admin_user = ()
try:
sql = f"INSERT INTO logins (username, password, email, row_type) VALUES (%s, %s, %s, %s) ON CONFLICT (username) DO UPDATE SET username = excluded.username RETURNING *;"
with conn.cursor() as cur:
cur.execute(sql, site_manager.admin_user)
rows = cur.fetchone()
if rows and convert:
admin_user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
admin_user = rows
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- Admin User Created!\n")
except Exception as error:
raise error
return admin_user
def setupSiteTables(conn, site_manager):
try:
for table in site_manager.create_order:
administration_database.createTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} Created!\n")
except Exception as error:
raise error
def addSite(payload, conn=None):
"""uses a Site Manager to add a site to the system
Args:
site_manager (MyDataclasses.SiteManager):
"""
self_conn = False
site_manager = database_payloads.SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
setupSiteTables(conn, site_manager)
admin_user = addAdminUser(conn, site_manager)
site = database_payloads.SitePayload(
site_name=site_manager.site_name,
site_description=site_manager.description,
site_owner_id=admin_user['id']
)
site = administration_database.insertSitesTuple(site.payload(), conn=conn)
role = database_payloads.RolePayload("Admin", f"Admin for {site['site_name']}", site['id'])
role = administration_database.insertRolesTuple(role.payload(), conn=conn)
admin_user = administration_database.updateAddLoginSitesRoles((site["id"], role["id"], admin_user["id"]), conn=conn)
default_zone = database_payloads.ZonesPayload(site_manager.default_zone)
default_zone = administration_database.insertZonesTuple(site["site_name"], default_zone.payload(), conn=conn)
uuid = f"{site_manager.default_zone}@{site_manager.default_location}"
default_location = database_payloads.LocationsPayload(uuid, site_manager.default_location, default_zone['id'])
default_location = administration_database.insertLocationsTuple(site['site_name'], default_location.payload(), conn=conn)
payload = {
'id': site['id'],
'update': {
'default_zone': default_zone['id'],
'default_auto_issue_location': default_location['id'],
'default_primary_location': default_location['id']
}
}
administration_database.updateSitesTuple(payload, conn=conn)
blank_vendor = database_payloads.VendorsPayload("None", admin_user['id'])
blank_brand = database_payloads.BrandsPayload("None")
blank_vendor = administration_database.insertVendorsTuple(site['site_name'], blank_vendor.payload(), conn=conn)
blank_brand = administration_database.insertBrandsTuple(site['site_name'], blank_brand.payload(), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
raise error

View File

@ -0,0 +1,261 @@
# 3RD PARTY IMPORTS
import psycopg2
import datetime
# APPLICATION IMPORTS
import config
from application import postsqldb, database_payloads
from application.administration import administration_database, administration_models
from dataclasses import dataclass, field
from application.database_postgres import (
CostLayersModel,
BrandsModel,
FoodInfoModel,
ItemInfoModel,
ZonesModel,
LocationsModel,
LogisticsInfoModel,
TransactionsModel,
ItemsModel,
ItemLocationsModel,
ConversionsModel,
SKUPrefixModel,
BarcodesModel,
VendorsModel,
ReceiptsModel,
ReceiptItemsModel,
RecipesModel,
RecipeItemsModel,
ShoppingListsModel,
ShoppingListItemsModel,
PlansModel,
PlanEventsModel,
SitesModel,
UsersModel,
RolesModel,
UnitsModel
)
from application.database_postgres import BaseModel
@dataclass
class SiteManager:
site_name: str
admin_user: tuple
default_zone: int
default_location: int
description: str
create_order: list = field(init=False)
drop_order: list = field(init=False)
def create_tables(self, conn):
UsersModel.UsersModel.create_table(self.site_name, conn=conn)
SitesModel.SitesModel.create_table(self.site_name, conn=conn)
RolesModel.RolesModel.create_table(self.site_name, conn=conn)
UnitsModel.UnitsModel.create_table(self.site_name, conn=conn)
# Needed for Items and Logistics
BrandsModel.BrandsModel.create_table(self.site_name, conn=conn)
ZonesModel.ZonesModel.create_table(self.site_name, conn=conn)
LocationsModel.LocationsModel.create_table(self.site_name, conn=conn)
ItemsModel.ItemsModel.create_table(self.site_name, conn=conn)
FoodInfoModel.FoodInfoModel.create_table(self.site_name, conn=conn)
ItemInfoModel.ItemInfoModel.create_table(self.site_name, conn=conn)
LogisticsInfoModel.LogisticsInfoModel.create_table(self.site_name, conn=conn)
ItemLocationsModel.ItemLocationsModel.create_table(self.site_name, conn=conn)
CostLayersModel.CostLayersModel.create_table(self.site_name, conn=conn)
ConversionsModel.ConversionsModel.create_table(self.site_name, conn=conn)
TransactionsModel.TransactionsModel.create_table(self.site_name, conn=conn)
SKUPrefixModel.SKUPrefixModel.create_table(self.site_name, conn=conn)
BarcodesModel.BarcodesModel.create_table(self.site_name, conn=conn)
# Vendors is used losely in Planner and in receipts.
VendorsModel.VendorsModel.create_table(self.site_name, conn=conn)
ReceiptsModel.ReceiptsModel.create_table(self.site_name, conn=conn)
ReceiptItemsModel.ReceiptItemsModel.create_table(self.site_name, conn=conn)
# This is the Recipe Module
RecipesModel.RecipesModel.create_table(self.site_name, conn=conn)
RecipeItemsModel.RecipeItemsModel.create_table(self.site_name, conn=conn)
# this is the Shopping List Module
ShoppingListsModel.ShoppingListsModel.create_table(self.site_name, conn=conn)
ShoppingListItemsModel.ShoppingListItemsModel.create_table(self.site_name, conn=conn)
# Planner Module
PlansModel.PlansModel.create_table(self.site_name, conn=conn)
PlanEventsModel.PlanEventsModel.create_table(self.site_name, conn=conn)
def drop_tables(self, conn):
# Needed for Items and Logistics
BrandsModel.BrandsModel.drop_table(self.site_name,conn=conn)
CostLayersModel.CostLayersModel.drop_table(self.site_name, conn=conn)
FoodInfoModel.FoodInfoModel.drop_table(self.site_name, conn=conn)
ItemInfoModel.ItemInfoModel.drop_table(self.site_name, conn=conn)
ZonesModel.ZonesModel.drop_table(self.site_name, conn=conn)
LocationsModel.LocationsModel.drop_table(self.site_name, conn=conn)
LogisticsInfoModel.LogisticsInfoModel.drop_table(self.site_name, conn=conn)
TransactionsModel.TransactionsModel.drop_table(self.site_name, conn=conn)
ItemsModel.ItemsModel.drop_table(self.site_name, conn=conn)
ItemLocationsModel.ItemLocationsModel.drop_table(self.site_name, conn=conn)
ConversionsModel.ConversionsModel.drop_table(self.site_name, conn=conn)
SKUPrefixModel.SKUPrefixModel.drop_table(self.site_name, conn=conn)
BarcodesModel.BarcodesModel.drop_table(self.site_name, conn=conn)
# Vendors is used losely in Planner and in receipts.
VendorsModel.VendorsModel.drop_table(self.site_name, conn=conn)
ReceiptsModel.ReceiptsModel.drop_table(self.site_name, conn=conn)
ReceiptItemsModel.ReceiptItemsModel.drop_table(self.site_name, conn=conn)
# This is the Recipe Module
RecipesModel.RecipesModel.drop_table(self.site_name, conn=conn)
RecipeItemsModel.RecipeItemsModel.drop_table(self.site_name, conn=conn)
# this is the Shopping List Module
ShoppingListsModel.ShoppingListsModel.drop_table(self.site_name, conn=conn)
ShoppingListItemsModel.ShoppingListItemsModel.drop_table(self.site_name, conn=conn)
# Planner Module
PlansModel.PlansModel.drop_table(self.site_name, conn=conn)
PlanEventsModel.PlanEventsModel.drop_table(self.site_name, conn=conn)
def deleteSite(payload, conn=None):
"""Uses a Site Manager to delete a site from the system.
Args:
site_manager (MyDataclasses.SiteManager):
Raises:
Exception:
"""
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
site_manager = SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
roles = administration_models.ExtendedRolesModel.select_by_site_uuid({'site_uuid': payload['site_uuid']}, conn=conn)
roles = RolesModel.RolesModel.delete_tuples([role['role_uuid'] for role in roles], conn=conn)
site_manager.drop_tables(conn=conn)
for role in roles:
administration_models.ExtendedUsersModel.update_roles({'role_uuid': role['role_uuid']}, conn=conn)
administration_models.ExtendedUsersModel.update_sites({'site_uuid': payload['site_uuid']}, conn=conn)
SitesModel.SitesModel.delete_tuples((payload['site_uuid'],), conn=conn)
if self_conn:
conn.commit()
conn.close()
def addSite(payload, conn=None):
"""uses a Site Manager to add a site to the system
Args:
site_manager (MyDataclasses.SiteManager):
"""
self_conn = False
site_manager = SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
sql = 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";'
with conn.cursor() as cur:
cur.execute(sql)
site_manager.create_tables(conn=conn)
admin_user = administration_models.ExtendedUsersModel.add_admin_user(site_manager.admin_user, conn=conn)
site = SitesModel.SitesModel.Payload(
site_name=site_manager.site_name,
site_description=site_manager.description,
site_created_by=admin_user['user_uuid']
)
# have to build site table
site = SitesModel.SitesModel.insert_tuple(site.site_name, site.payload_dictionary(), conn=conn)
# have to build roles table
role = RolesModel.RolesModel.Payload(
role_name="Admin",
role_description=f"Admin for {site['site_name']}",
role_site_uuid=site['site_uuid']
)
role = RolesModel.RolesModel.insert_tuple(site['site_name'], role.payload_dictionary(), conn=conn)
# have to build logins table
payload = {
'user_uuid': admin_user['user_uuid'],
'site_uuid': site['site_uuid'],
'role_uuid': role['role_uuid']
}
admin_user = administration_models.ExtendedUsersModel.update_user_site_roles(payload, conn=conn)
default_zone = ZonesModel.ZonesModel.Payload(zone_name=site_manager.default_zone)
default_zone = ZonesModel.ZonesModel.insert_tuple(site["site_name"], default_zone.payload_dictionary(), conn=conn)
uuid = f"{site_manager.default_zone}@{site_manager.default_location}"
default_location = LocationsModel.LocationsModel.Payload(
location_shortname=uuid,
location_name=site_manager.default_location,
zone_uuid=default_zone['zone_uuid']
)
default_location = LocationsModel.LocationsModel.insert_tuple(site['site_name'], default_location.payload_dictionary(), conn=conn)
payload = {
'key': site['site_uuid'],
'update': {
'site_default_zone_uuid': default_zone['zone_uuid'],
'site_default_auto_issue_location_uuid': default_location['location_uuid'],
'site_default_primary_location_uuid': default_location['location_uuid']
}
}
SitesModel.SitesModel.update_tuple(payload, conn=conn)
blank_vendor = VendorsModel.VendorsModel.Payload("None", admin_user['user_uuid'])
blank_brand = BrandsModel.BrandsModel.Payload("None")
VendorsModel.VendorsModel.insert_tuple(site['site_name'], blank_vendor.payload_dictionary(), conn=conn)
BrandsModel.BrandsModel.insert_tuple(site['site_name'], blank_brand.payload_dictionary(), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
raise error

View File

@ -204,7 +204,7 @@
payload: payload
}),
});
location.href = '/administration'
//location.href = '/administration'
}
async function postEditSite(){

View File

@ -1,5 +1,6 @@
from abc import ABC
import psycopg2
import psycopg2.extras
import datetime
import uuid
import json
@ -9,7 +10,6 @@ from copy import deepcopy
import config
def validateUUID(uuid_string, version):
try:
u = uuid.UUID(uuid_string, version=version)
@ -29,8 +29,8 @@ class DatabaseError(Exception):
with open("logs/database.log", "a+") as file:
file.write("\n")
file.write(f"{datetime.datetime.now()} --- ERROR --- DatabaseError(message='{self.message}',\n")
file.write(f"{" "*41}payload={self.payload},\n")
file.write(f"{" "*41}sql='{self.sql}')")
file.write(f"{' '*41}payload={self.payload},\n")
file.write(f"{' '*41}sql='{self.sql}')")
def __str__(self):
return f"DatabaseError(message='{self.message}', payload={self.payload}, sql='{self.sql}')"
@ -58,6 +58,10 @@ def getUUID(n):
class BasePayload(ABC):
"""BasePayloads holds the bare minimum methods required of a Payload. """
def __repr__(self):
return self.__dict__
def payload_dictionary(self):
return deepcopy(self.__dict__)
@ -76,6 +80,8 @@ class BaseModel(ABC):
"""
table_name: str = None # All extended class must assign a table name that CRUD uses to call upon
primary_key: str = 'id' # All extended class can assign a different primary key/cloumn which is used to call delete and update queries on.
primary_key_type: str = 'int'
site_agnostic: bool = False #all extended class can set this to true to avoid site injection
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
@ -225,3 +231,104 @@ class BaseModel(ABC):
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_tuple(self, site: str, payload: dict, convert: bool = True, conn=None):
''' payload = {'key': value_to_filter}'''
record = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
else:
sql = f"SELECT * FROM {site}_{self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_tuples_by_key(self, site: str, payload: dict, convert: bool = True, conn=None):
'''payload = {'key'}'''
records = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
else:
sql = f"SELECT * FROM {site}_{self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
records = rows
if self_conn:
conn.commit()
conn.close()
return records
except Exception as error:
raise DatabaseError(error, {}, sql)
@classmethod
def select_tuples(self, site: str, convert: bool = True, conn=None):
records = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name};"
else:
sql = f"SELECT * FROM {site}_{self.table_name};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if rows and convert:
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
records = rows
if self_conn:
conn.commit()
conn.close()
return records
except Exception as error:
raise DatabaseError(error, {}, sql)

View File

@ -7,5 +7,5 @@ class BrandsModel(BaseModel):
@dataclass
class Payload(BasePayload):
name: str
brand_name: str

View File

@ -1,17 +1,93 @@
from dataclasses import dataclass
import datetime
from application.database_postgres.BaseModel import BasePayload, BaseModel
import config
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, tupleDictionaryFactory, DatabaseError, updateStringFactory
class CostLayersModel(BaseModel):
table_name = "cost_layers"
primary_key = "item_location_uuid"
primary_key_type = "uuid"
@dataclass
class Payload(BasePayload):
aquisition_date: datetime.datetime
quantity: float
cost: float
currency_type: str
vendor: int = 0
expires: datetime.datetime = None
item_location_uuid: str
layer_aquisition_date: datetime.datetime
layer_quantity: float
layer_cost: float
layer_currency_type: str
layer_vendor: str = None
layer_expires: datetime.datetime = None
@classmethod
def delete_by_layer_id(self, site: str, payload: tuple, convert: bool = True, conn=None):
""" Pass a tuple of layer_ids to remove from the database.
Args:
site (str): name of the site to delete from
payload (tuple): a tuple of layer_ids
convert (bool, optional): whether to return the deleted rows as dictionaries. Defaults to True.
conn (_type_, optional): postgresql connector object. Defaults to None.
Raises:
DatabaseError: raised for all errors with database handling, logs to database.log
Returns:
dict, list: returns a list of all deleted rows.
"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {site}_{self.table_name} WHERE layer_id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def update_by_layer_id(self, site: str, payload:dict, convert=True, conn=None):
updated = ()
self_conn = False
set_clause, values = updateStringFactory(payload['update'])
values.append(payload['key'])
sql = f"UPDATE {site}_{self.table_name} SET {set_clause} WHERE layer_id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -5,20 +5,23 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2
class FoodInfoModel(BaseModel):
table_name = "food_info"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass
class Payload(BasePayload):
food_groups: list = field(default_factory=list)
ingrediants: list = field(default_factory=list)
nutrients: dict = field(default_factory=dict)
expires: bool = False
default_expiration: float = 0.0
item_uuid: str
item_food_groups: list = field(default_factory=list)
item_ingredients: list = field(default_factory=list)
item_nutrients: dict = field(default_factory=dict)
item_expires: bool = False
item_default_expiration: float = 0.0
def payload_dictionary(self):
return {
'food_groups': lst2pgarr(self.food_groups),
'ingrediants': lst2pgarr(self.ingrediants),
'nutrients': json.dumps(self.nutrients),
'expires': self.expires,
'default_expiration': self.default_expiration
}
payload = super().payload_dictionary()
payload['item_food_groups'] = lst2pgarr(self.item_food_groups)
payload['item_ingredients'] = lst2pgarr(self.item_ingredients)
payload['item_nutrients'] = json.dumps(self.item_nutrients)
return payload

View File

@ -5,20 +5,22 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2
class ItemInfoModel(BaseModel):
table_name = "item_info"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass
class Payload(BasePayload):
barcode: str
packaging: str = ""
uom_quantity: float = 1.0
uom: int = 1
cost: float = 0.0
safety_stock: float = 0.0
lead_time_days: float = 0.0
ai_pick: bool = False
prefixes: list = field(default_factory=list)
item_uuid: str
item_uom: str = None
item_packaging: str = ""
item_uom_quantity: float = 1.0
item_cost: float = 0.0
item_safety_stock: float = 0.0
item_lead_time_days: float = 0.0
item_ai_pick: bool = False
item_prefixes: list = field(default_factory=list)
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['prefixes'] = lst2pgarr(self.prefixes)
payload['item_prefixes'] = lst2pgarr(self.item_prefixes)
return payload

View File

@ -1,18 +1,46 @@
from dataclasses import dataclass, field
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
import config
import psycopg2
class ItemLocationsModel(BaseModel):
table_name = "item_locations"
primary_key = "item_location_uuid"
primary_key_type = 'uuid'
@dataclass
class Payload(BasePayload):
part_id: int
location_id: int
quantity_on_hand: float = 0.0
cost_layers: list = field(default_factory=list)
item_uuid: str
location_uuid: str
item_quantity_on_hand: float = 0.0
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['cost_layers'] = lst2pgarr(self.cost_layers)
return payload
@classmethod
def select_by_location_and_item(self, site:str, payload:dict, convert: bool=True, conn = None):
recordset = ()
self_conn = False
sql = f"SELECT * FROM {site}_item_locations WHERE item_uuid = %(item_uuid)s AND location_uuid = %(location_uuid)s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
recordset = tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
recordset = rows
if self_conn:
conn.close()
return recordset
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -1,31 +1,135 @@
from dataclasses import dataclass, field
import json
import psycopg2
import datetime
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, DatabaseError, tupleDictionaryFactory
import config
class ItemsModel(BaseModel):
table_name = "items"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass
class Payload(BasePayload):
item_info_id: int
item_info_uuid: str
logistics_info_id: int
logistics_info_uuid: str
food_info_id: int
food_info_uuid: str
barcode: str = ""
item_name: str = ""
brand: int = 0
description: str = ""
tags: list = field(default_factory=list)
links: dict = field(default_factory=dict)
row_type: str = ""
item_type: str = ""
search_string: str =""
item_category: str
item_name: str
item_created_at: datetime.datetime = field(init=False)
item_updated_at: datetime.datetime = field(init=False)
item_description: str = ""
item_tags: list = field(default_factory=list)
item_links: dict = field(default_factory=dict)
item_brand_uuid: str = None
item_search_string: str = ""
item_inactive: bool = False
def __post_init__(self):
self.item_created_at = datetime.datetime.now()
self.item_updated_at = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['tags'] = lst2pgarr(self.tags)
payload['links'] = json.dumps(self.links)
payload['item_tags'] = lst2pgarr(self.item_tags)
payload['item_links'] = json.dumps(self.item_links)
return payload
@classmethod
def get_item_by_uuid(self, site:str, payload: dict, convert: bool=True, conn = None):
record = ()
self_conn = False
with open('application/database_postgres/sql/ItemsModel/getItemAllByUUID.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
record = rows
if self_conn:
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def paginate_items_with_qoh(self, site:str, payload: dict, convert: bool=True, conn = None):
recordset = ()
count = 0
self_conn = False
with open('application/database_postgres/sql/ItemsModel/paginateItemsWithQOH.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload['sort_order'])
sql_count = f"SELECT COUNT(*) FROM {site}_{self.table_name} items WHERE items.item_search_string LIKE '%%' || %(search_string)s || '%%';"
recordset = ()
count = 0
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
recordset = rows
cur.execute(sql_count, payload)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def paginate_items_for_modal(self, site:str, payload: dict, convert: bool=True, conn = None):
recordset = ()
count = 0
self_conn = False
with open('application/database_postgres/sql/ItemsModel/paginateItemsForModal.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload['sort_order'])
sql_count = f"SELECT COUNT(*) FROM {site}_{self.table_name} items WHERE items.item_search_string LIKE '%%' || %(search_string)s || '%%';"
recordset = ()
count = 0
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
recordset = rows
cur.execute(sql_count, payload)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -7,7 +7,8 @@ class LocationsModel(BaseModel):
@dataclass
class Payload(BasePayload):
uuid: str
name: str
zone_id: int
location_shortname: str
location_name: str
zone_uuid: str

View File

@ -4,12 +4,14 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel
class LogisticsInfoModel(BaseModel):
table_name = "logistics_info"
primary_key_type = "uuid"
primary_key = "item_uuid"
@dataclass
class Payload(BasePayload):
barcode: str
primary_location: int
primary_zone: int
auto_issue_location: int
auto_issue_zone: int
item_uuid: str
item_primary_location: str = None
item_primary_zone: str = None
item_auto_issue_location: str = None
item_auto_issue_zone: str = None

View File

@ -0,0 +1,50 @@
from dataclasses import dataclass, field
import json
import config
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, DatabaseError, tupleDictionaryFactory
class RolesModel(BaseModel):
table_name = "roles"
primary_key = "role_uuid"
@dataclass
class Payload(BasePayload):
role_name: str
role_description: str
role_site_uuid: str
role_flags: dict = field(default_factory=dict)
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['role_flags'] = json.dumps(self.role_flags)
return payload
@classmethod
def delete_tuples(self, payload: tuple, convert: bool = True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {self.table_name} WHERE {self.primary_key} IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -0,0 +1,121 @@
from dataclasses import dataclass, field
import json
import datetime
import psycopg2
from application.database_postgres.BaseModel import (
BasePayload, BaseModel, tupleDictionaryFactory, DatabaseError, updateStringFactory
)
import config
class SitesModel(BaseModel):
table_name = "sites"
primary_key = "site_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
site_name: str
site_description: str
site_created_by: str
site_default_zone_uuid: str = None
site_default_auto_issue_location_uuid: str = None
site_default_primary_location_uuid: str = None
site_created_on: datetime.datetime = field(init=False)
site_flags: dict = field(default_factory=dict)
def __post_init__(self):
self.site_created_on = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['site_flags'] = json.dumps(self.site_flags)
return payload
@classmethod
def delete_tuples(self, payload: tuple, convert: bool = True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {self.table_name} WHERE {self.primary_key} IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def update_tuple(self, payload: dict, convert=True, conn=None):
""" payload (dict): {'key': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = updateStringFactory(payload['update'])
values.append(payload['key'])
sql = f"UPDATE {self.table_name} SET {set_clause} WHERE {self.primary_key}=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_all(self, payload: dict, convert=True, conn=None):
record = ()
self_conn = False
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key}=%(key)s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -6,20 +6,25 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel
class TransactionsModel(BaseModel):
table_name = "transactions"
primary_key = "transaction_uuid"
primary_key_type = "uuid"
@dataclass
class Payload(BasePayload):
timestamp: datetime.datetime
logistics_info_id: int
barcode: str
name: str
item_uuid: str
transaction_created_by: str
transaction_name: str
transaction_type: str
quantity: float
description: str
user_id: int
data: dict = field(default_factory=dict)
transaction_created_at: datetime.datetime = field(init=False)
transaction_quantity: float = 0.00
transaction_description: str = ''
transaction_cost: float = 0.00
transaction_data: dict = field(default_factory=dict)
def __post_init__(self):
self.transaction_created_at = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['data'] = json.dumps(self.data)
payload['transaction_data'] = json.dumps(self.transaction_data)
return payload

View File

@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
class UnitsModel(BaseModel):
table_name = "units"
primary_key = "units_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
unit_plural:str
unit_single:str
unit_fullname: str
unit_description: str

View File

@ -0,0 +1,82 @@
from dataclasses import dataclass, field
import json
import datetime
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
import config
class UsersModel(BaseModel):
table_name = "users"
primary_key = "user_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
user_name:str
user_password:str
user_email: str
user_flags: dict = field(default_factory=dict)
user_favorites: dict = field(default_factory=dict)
user_sites: list = field(default_factory=list)
user_roles: list = field(default_factory=list)
user_is_system_admin: bool = False
user_row_type: str = "user"
user_profile_pic_url: str = ""
user_login_type: str = "Internal"
user_joined_on: datetime.datetime = field(init=False)
def __post_init__(self):
self.creation_date = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['user_flags'] = json.dumps(self.user_flags)
payload['user_favorites'] = json.dumps(self.user_favorites)
payload['user_sites'] = lst2pgarr(self.user_sites)
payload['user_roles'] = lst2pgarr(self.user_roles)
return payload
@staticmethod
def washUserDictionary(user):
return {
'user_uuid': user['user_uuid'],
'user_name': user['user_name'],
'user_sites': user['user_sites'],
'user_roles': user['user_roles'],
'user_is_system_admin': user['user_is_system_admin'],
'user_flags': user['user_flags'],
'user_profile_pic_url': user['user_profile_pic_url'],
'user_login_type': user['user_login_type']
}
@classmethod
def select_tuple_by_username(self, payload: dict, convert: bool = True, conn=None):
record = ()
self_conn = False
sql = f"SELECT * FROM {self.table_name} WHERE user_name = %(key)s"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -9,10 +9,10 @@ class VendorsModel(BaseModel):
@dataclass
class Payload(BasePayload):
vendor_name: str
created_by: int
vendor_created_by: str
vendor_address: str = ""
creation_date: datetime.datetime = field(init=False)
phone_number: str = ""
vendor_creation_date: datetime.datetime = field(init=False)
vendor_phone_number: str = ""
def __post_init__(self):
self.creation_date = datetime.datetime.now()
self.vendor_creation_date = datetime.datetime.now()

View File

@ -7,6 +7,6 @@ class ZonesModel(BaseModel):
@dataclass
class Payload(BasePayload):
name: str
description: str = ""
zone_name: str
zone_description: str = ""

Some files were not shown because too many files have changed in this diff Show More