Compare commits

..

8 Commits
main ... dev

Author SHA1 Message Date
Mechseroms
9b7ba8d8da setup for linux 2025-10-12 17:09:28 -05:00
Jadowyne Ulve
d261076fd4 Moving PCS 2025-10-08 10:18:31 -05:00
Jadowyne Ulve
cbfe74c868 more database migration 2025-09-30 15:07:59 -05:00
Jadowyne Ulve
a412a02652 continued migration 0909202591813 2025-09-09 09:17:48 -05:00
Jadowyne Ulve
f0f85cbfcf Merge remote-tracking branch 'origin/main' into dev 2025-09-07 16:45:53 -05:00
Jadowyne Ulve
0b255d1b91 rebasing 2025-09-07 16:40:30 -05:00
Jadowyne Ulve
a3cad9622e 2nd Database oop migration 2025-08-28 17:58:05 -05:00
Jadowyne Ulve
5b5b914b35 testing dev branch 2025-08-23 15:25:56 -05:00
258 changed files with 4113 additions and 2252 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ test.py
.VScodeCounter .VScodeCounter
celerybeat-schedule celerybeat-schedule
instance/application.cfg.py instance/application.cfg.py
docs

View File

@ -14,4 +14,6 @@ purchase into a receipt.
You then edit and receive that receipt into the system. You then edit and receive that receipt into the system.
There is also the ability to use a kiosk like interface to set up a scan in and out system, where as you use things There is also the ability to use a kiosk like interface to set up a scan in and out system, where as you use things
the system will remove those items by scanning them. the system will remove those items by scanning them.
test

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,27 +1,20 @@
from flask import Blueprint, request, render_template, redirect, session, url_for, jsonify from flask import Blueprint, request, render_template, redirect, session, url_for, jsonify
from authlib.integrations.flask_client import OAuth import hashlib
import hashlib, psycopg2
from config import config, sites_config from config import config, sites_config
from functools import wraps from functools import wraps
import postsqldb
import requests import requests
from application.access_module import access_database from application.access_module import access_database
from application.database_postgres.UsersModel import UsersModel
from outh import oauth from outh import oauth
access_api = Blueprint('access_api', __name__, template_folder="templates", static_folder="static") access_api = Blueprint('access_api', __name__, template_folder="templates", static_folder="static")
def update_session_user(): def update_session_user():
user = access_database.selectLoginsTupleByID((session['user_id'],)) user = UsersModel.select_tuple(session['selected_site'], {'key': session['user_uuid']})
user = access_database.washUserDictionary(user) user = UsersModel.washUserDictionary(user)
session['user'] = user session['user'] = user
print(user)
def login_required(func): def login_required(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
@ -89,20 +82,13 @@ def login():
password = hashlib.sha256(password.encode()).hexdigest() password = hashlib.sha256(password.encode()).hexdigest()
database_config = config() database_config = config()
with psycopg2.connect(**database_config) as conn:
try:
with conn.cursor() as cur:
sql = f"SELECT * FROM logins WHERE username=%s;"
cur.execute(sql, (username,))
user = cur.fetchone()
except (Exception, psycopg2.DatabaseError) as error:
conn.rollback()
return jsonify({'error': True, 'message': str(error)})
if user and user[2] == password: user = UsersModel.select_tuple_by_username({'key': username})
session['user_id'] = user[0]
session['user'] = {'id': user[0], 'username': user[1], 'sites': user[13], 'site_roles': user[14], 'system_admin': user[15], 'flags': user[16]} if user and user['user_password'] == password:
session['login_type'] = 'Internal' session['user_uuid'] = user['user_uuid']
session['user'] = UsersModel.washUserDictionary(user)
session['user_login_type'] = 'Internal'
return jsonify({'error': False, 'message': 'Logged In Sucessfully!'}) return jsonify({'error': False, 'message': 'Logged In Sucessfully!'})
else: else:
return jsonify({'error': True, 'message': 'Username or Password was incorrect!'}) return jsonify({'error': True, 'message': 'Username or Password was incorrect!'})
@ -111,16 +97,8 @@ def login():
if 'user' not in session.keys(): if 'user' not in session.keys():
session['user'] = None session['user'] = None
print(instance_config)
return render_template("login.html", instance_settings=instance_config) return render_template("login.html", instance_settings=instance_config)
@access_api.route('/dashboard')
def dashboard():
if 'user' not in session:
return redirect('/')
return f"Hello, {session['user']['name']}! <a href='/logout'>Logout</a>"
@access_api.route('/signup', methods=['POST', 'GET']) @access_api.route('/signup', methods=['POST', 'GET'])
def signup(): def signup():
instance_config = sites_config() instance_config = sites_config()
@ -132,14 +110,14 @@ def signup():
password = request.get_json()['password'] password = request.get_json()['password']
email = request.get_json()['email'] email = request.get_json()['email']
password = hashlib.sha256(password.encode()).hexdigest() password = hashlib.sha256(password.encode()).hexdigest()
database_config = config()
with psycopg2.connect(**database_config) as conn: new_user = UsersModel.Payload(
try: user_name=username,
with conn.cursor() as cur: user_password=password,
sql = f"INSERT INTO logins(username, password, email, row_type) VALUES(%s, %s, %s, %s);" user_email=email
cur.execute(sql, (username, password, email, 'user')) )
except (Exception, psycopg2.DatabaseError) as error:
conn.rollback() new_user = UsersModel.insert_tuple('', new_user.payload_dictionary())
return jsonify({'error': True, 'message': str(error)})
return jsonify({'error': False, 'message': 'You have been signed up successfully, you will have to wait until the server admin finishes your onboarding!'}) return jsonify({'error': False, 'message': 'You have been signed up successfully, you will have to wait until the server admin finishes your onboarding!'})
return jsonify({'error': True, 'message': 'There was a problem with this POST request!'}) return jsonify({'error': True, 'message': 'There was a problem with this POST request!'})

View File

@ -7,7 +7,7 @@ import hashlib
# APPLICATION IMPORTS # APPLICATION IMPORTS
from application.access_module import access_api from application.access_module import access_api
from application.administration import administration_database, administration_processes from application.administration import administration_database, administration_services
from application import database_payloads, postsqldb from application import database_payloads, postsqldb
@ -69,7 +69,7 @@ def first_time_setup():
"site_description": request.form['site_description'] "site_description": request.form['site_description']
} }
administration_processes.addSite(payload) administration_services.addSite(payload)
return redirect("/login") return redirect("/login")
@ -127,7 +127,7 @@ def postDeleteSite():
return jsonify({'error': True, 'message': f"You must be the owner of this site to delete."}) return jsonify({'error': True, 'message': f"You must be the owner of this site to delete."})
try: try:
administration_processes.deleteSite(site, user) administration_services.deleteSite(site, user)
except Exception as err: except Exception as err:
print(err) print(err)
@ -144,7 +144,7 @@ def postAddSite():
user = administration_database.selectLoginsTuple((user_id,)) user = administration_database.selectLoginsTuple((user_id,))
payload['admin_user'] = (user['username'], user['password'], user['email'], user['row_type']) payload['admin_user'] = (user['username'], user['password'], user['email'], user['row_type'])
administration_processes.addSite(payload) administration_services.addSite(payload)
return jsonify({'error': False, 'message': f"Zone added to {site_name}."}) return jsonify({'error': False, 'message': f"Zone added to {site_name}."})

View File

@ -0,0 +1,148 @@
from application.database_postgres.UsersModel import UsersModel
from application.database_postgres.RolesModel import RolesModel
from application.database_postgres.BaseModel import DatabaseError, tupleDictionaryFactory
import config
import psycopg2
class ExtendedRolesModel(RolesModel):
@classmethod
def select_by_site_uuid(self, payload, convert=True, conn=None):
roles = ()
self_conn = False
select_roles_sql = f"SELECT * FROM roles WHERE role_site_uuid = %(site_uuid)s::uuid;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(select_roles_sql, payload)
rows = cur.fetchall()
if rows and convert:
roles = [tupleDictionaryFactory(cur.description, role) for role in rows]
elif rows and not convert:
roles = rows
if self_conn:
conn.close()
return roles
except Exception as error:
raise DatabaseError(error, payload, select_roles_sql)
class ExtendedUsersModel(UsersModel):
@classmethod
def add_admin_user(self, payload:dict, convert=True, conn=None):
admin_user = ()
self_conn = False
sql = f"""INSERT INTO users (user_name, user_password, user_email, user_row_type)
VALUES (%(user_name)s, %(user_password)s, %(user_email)s, %(user_row_type)s) ON CONFLICT (user_name) DO UPDATE SET user_name = excluded.user_name RETURNING *;"""
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
admin_user = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
admin_user = rows
if self_conn:
conn.commit()
conn.close()
return admin_user
except Exception as e:
DatabaseError(str(e), payload, sql)
@classmethod
def update_roles(self, payload, convert=True, conn=None):
""" payload: {'role_uuid': x,} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT users.user_uuid FROM users WHERE user_roles @> ARRAY[%(role_uuid)s::uuid];"
with conn.cursor() as cur:
cur.execute(select_sql, payload)
users = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE users SET user_roles = array_remove(user_roles, %(role_uuid)s::uuid) WHERE user_uuid = %(user_uuid)s::uuid;"
with conn.cursor() as cur:
for user_uuid in users:
cur.execute(update_sql, {'role_uuid': payload['role_uuid'], 'user_uuid': user_uuid})
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
@classmethod
def update_sites(self, payload, convert=True, conn=None):
""" payload: {'site_uuid',} """
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
select_sql = f"SELECT users.user_uuid FROM users WHERE user_sites @> ARRAY[%(site_uuid)s::uuid];"
with conn.cursor() as cur:
cur.execute(select_sql, payload)
user = tuple([row[0] for row in cur.fetchall()])
update_sql = f"UPDATE users SET user_sites = array_remove(user_sites, %(site_uuid)s::uuid) WHERE user_uuid = %(user_uuid)s::uuid;"
with conn.cursor() as cur:
for user_uuid in user:
cur.execute(update_sql, {'site_uuid': payload['site_uuid'], 'user_uuid': user_uuid})
if self_conn:
conn.commit()
conn.close()
except Exception as error:
raise error
@classmethod
def update_user_site_roles(self, payload, convert=True, conn=None):
""" payload (tuple): (site_uuid, role_uuid, user_uuid) """
sql = f"UPDATE users SET user_sites = user_sites || %(site_uuid)s::uuid, user_roles = user_roles || %(role_uuid)s::uuid WHERE user_uuid=%(user_uuid)s RETURNING *;"
user = ()
self_conn = False
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
user = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
user = rows
if self_conn:
conn.commit()
conn.close()
return user
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -1,167 +0,0 @@
# 3RD PARTY IMPORTS
import psycopg2
import datetime
# APPLICATION IMPORTS
import config
from application import postsqldb, database_payloads
from application.administration import administration_database
def dropSiteTables(conn, site_manager):
try:
for table in site_manager.drop_order:
administration_database.dropTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} DROPPED!\n")
except Exception as error:
raise error
def deleteSite(site, user, conn=None):
"""Uses a Site Manager to delete a site from the system.
Args:
site_manager (MyDataclasses.SiteManager):
Raises:
Exception:
"""
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
try:
admin_user = (user['username'], user['password'], user['email'], user['row_type'])
site_manager = database_payloads.SiteManager(
site['site_name'],
admin_user,
site['default_zone'],
site['default_primary_location'],
site['site_description']
)
roles = administration_database.selectRolesTupleBySite((site['id'],), conn=conn)
administration_database.deleteRolesTuple([role['id'] for role in roles], conn=conn)
dropSiteTables(conn, site_manager)
for role in roles:
administration_database.updateUsersRoles({'role_id': role['id']}, conn=conn)
administration_database.updateUsersSites({'site_id': site['id']}, conn=conn)
site = administration_database.deleteSitesTuple((site['id'], ), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
conn.close()
def addAdminUser(conn, site_manager, convert=True):
admin_user = ()
try:
sql = f"INSERT INTO logins (username, password, email, row_type) VALUES (%s, %s, %s, %s) ON CONFLICT (username) DO UPDATE SET username = excluded.username RETURNING *;"
with conn.cursor() as cur:
cur.execute(sql, site_manager.admin_user)
rows = cur.fetchone()
if rows and convert:
admin_user = postsqldb.tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
admin_user = rows
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- Admin User Created!\n")
except Exception as error:
raise error
return admin_user
def setupSiteTables(conn, site_manager):
try:
for table in site_manager.create_order:
administration_database.createTable(site_manager.site_name, table, conn=conn)
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- INFO --- {table} Created!\n")
except Exception as error:
raise error
def addSite(payload, conn=None):
"""uses a Site Manager to add a site to the system
Args:
site_manager (MyDataclasses.SiteManager):
"""
self_conn = False
site_manager = database_payloads.SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
setupSiteTables(conn, site_manager)
admin_user = addAdminUser(conn, site_manager)
site = database_payloads.SitePayload(
site_name=site_manager.site_name,
site_description=site_manager.description,
site_owner_id=admin_user['id']
)
site = administration_database.insertSitesTuple(site.payload(), conn=conn)
role = database_payloads.RolePayload("Admin", f"Admin for {site['site_name']}", site['id'])
role = administration_database.insertRolesTuple(role.payload(), conn=conn)
admin_user = administration_database.updateAddLoginSitesRoles((site["id"], role["id"], admin_user["id"]), conn=conn)
default_zone = database_payloads.ZonesPayload(site_manager.default_zone)
default_zone = administration_database.insertZonesTuple(site["site_name"], default_zone.payload(), conn=conn)
uuid = f"{site_manager.default_zone}@{site_manager.default_location}"
default_location = database_payloads.LocationsPayload(uuid, site_manager.default_location, default_zone['id'])
default_location = administration_database.insertLocationsTuple(site['site_name'], default_location.payload(), conn=conn)
payload = {
'id': site['id'],
'update': {
'default_zone': default_zone['id'],
'default_auto_issue_location': default_location['id'],
'default_primary_location': default_location['id']
}
}
administration_database.updateSitesTuple(payload, conn=conn)
blank_vendor = database_payloads.VendorsPayload("None", admin_user['id'])
blank_brand = database_payloads.BrandsPayload("None")
blank_vendor = administration_database.insertVendorsTuple(site['site_name'], blank_vendor.payload(), conn=conn)
blank_brand = administration_database.insertBrandsTuple(site['site_name'], blank_brand.payload(), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
raise error

View File

@ -0,0 +1,261 @@
# 3RD PARTY IMPORTS
import psycopg2
import datetime
# APPLICATION IMPORTS
import config
from application import postsqldb, database_payloads
from application.administration import administration_database, administration_models
from dataclasses import dataclass, field
from application.database_postgres import (
CostLayersModel,
BrandsModel,
FoodInfoModel,
ItemInfoModel,
ZonesModel,
LocationsModel,
LogisticsInfoModel,
TransactionsModel,
ItemsModel,
ItemLocationsModel,
ConversionsModel,
SKUPrefixModel,
BarcodesModel,
VendorsModel,
ReceiptsModel,
ReceiptItemsModel,
RecipesModel,
RecipeItemsModel,
ShoppingListsModel,
ShoppingListItemsModel,
PlansModel,
PlanEventsModel,
SitesModel,
UsersModel,
RolesModel,
UnitsModel
)
from application.database_postgres import BaseModel
@dataclass
class SiteManager:
site_name: str
admin_user: tuple
default_zone: int
default_location: int
description: str
create_order: list = field(init=False)
drop_order: list = field(init=False)
def create_tables(self, conn):
UsersModel.UsersModel.create_table(self.site_name, conn=conn)
SitesModel.SitesModel.create_table(self.site_name, conn=conn)
RolesModel.RolesModel.create_table(self.site_name, conn=conn)
UnitsModel.UnitsModel.create_table(self.site_name, conn=conn)
# Needed for Items and Logistics
BrandsModel.BrandsModel.create_table(self.site_name, conn=conn)
ZonesModel.ZonesModel.create_table(self.site_name, conn=conn)
LocationsModel.LocationsModel.create_table(self.site_name, conn=conn)
ItemsModel.ItemsModel.create_table(self.site_name, conn=conn)
FoodInfoModel.FoodInfoModel.create_table(self.site_name, conn=conn)
ItemInfoModel.ItemInfoModel.create_table(self.site_name, conn=conn)
LogisticsInfoModel.LogisticsInfoModel.create_table(self.site_name, conn=conn)
ItemLocationsModel.ItemLocationsModel.create_table(self.site_name, conn=conn)
CostLayersModel.CostLayersModel.create_table(self.site_name, conn=conn)
ConversionsModel.ConversionsModel.create_table(self.site_name, conn=conn)
TransactionsModel.TransactionsModel.create_table(self.site_name, conn=conn)
SKUPrefixModel.SKUPrefixModel.create_table(self.site_name, conn=conn)
BarcodesModel.BarcodesModel.create_table(self.site_name, conn=conn)
# Vendors is used losely in Planner and in receipts.
VendorsModel.VendorsModel.create_table(self.site_name, conn=conn)
ReceiptsModel.ReceiptsModel.create_table(self.site_name, conn=conn)
ReceiptItemsModel.ReceiptItemsModel.create_table(self.site_name, conn=conn)
# This is the Recipe Module
RecipesModel.RecipesModel.create_table(self.site_name, conn=conn)
RecipeItemsModel.RecipeItemsModel.create_table(self.site_name, conn=conn)
# this is the Shopping List Module
ShoppingListsModel.ShoppingListsModel.create_table(self.site_name, conn=conn)
ShoppingListItemsModel.ShoppingListItemsModel.create_table(self.site_name, conn=conn)
# Planner Module
PlansModel.PlansModel.create_table(self.site_name, conn=conn)
PlanEventsModel.PlanEventsModel.create_table(self.site_name, conn=conn)
def drop_tables(self, conn):
# Needed for Items and Logistics
BrandsModel.BrandsModel.drop_table(self.site_name,conn=conn)
CostLayersModel.CostLayersModel.drop_table(self.site_name, conn=conn)
FoodInfoModel.FoodInfoModel.drop_table(self.site_name, conn=conn)
ItemInfoModel.ItemInfoModel.drop_table(self.site_name, conn=conn)
ZonesModel.ZonesModel.drop_table(self.site_name, conn=conn)
LocationsModel.LocationsModel.drop_table(self.site_name, conn=conn)
LogisticsInfoModel.LogisticsInfoModel.drop_table(self.site_name, conn=conn)
TransactionsModel.TransactionsModel.drop_table(self.site_name, conn=conn)
ItemsModel.ItemsModel.drop_table(self.site_name, conn=conn)
ItemLocationsModel.ItemLocationsModel.drop_table(self.site_name, conn=conn)
ConversionsModel.ConversionsModel.drop_table(self.site_name, conn=conn)
SKUPrefixModel.SKUPrefixModel.drop_table(self.site_name, conn=conn)
BarcodesModel.BarcodesModel.drop_table(self.site_name, conn=conn)
# Vendors is used losely in Planner and in receipts.
VendorsModel.VendorsModel.drop_table(self.site_name, conn=conn)
ReceiptsModel.ReceiptsModel.drop_table(self.site_name, conn=conn)
ReceiptItemsModel.ReceiptItemsModel.drop_table(self.site_name, conn=conn)
# This is the Recipe Module
RecipesModel.RecipesModel.drop_table(self.site_name, conn=conn)
RecipeItemsModel.RecipeItemsModel.drop_table(self.site_name, conn=conn)
# this is the Shopping List Module
ShoppingListsModel.ShoppingListsModel.drop_table(self.site_name, conn=conn)
ShoppingListItemsModel.ShoppingListItemsModel.drop_table(self.site_name, conn=conn)
# Planner Module
PlansModel.PlansModel.drop_table(self.site_name, conn=conn)
PlanEventsModel.PlanEventsModel.drop_table(self.site_name, conn=conn)
def deleteSite(payload, conn=None):
"""Uses a Site Manager to delete a site from the system.
Args:
site_manager (MyDataclasses.SiteManager):
Raises:
Exception:
"""
self_conn = False
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
site_manager = SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
roles = administration_models.ExtendedRolesModel.select_by_site_uuid({'site_uuid': payload['site_uuid']}, conn=conn)
roles = RolesModel.RolesModel.delete_tuples([role['role_uuid'] for role in roles], conn=conn)
site_manager.drop_tables(conn=conn)
for role in roles:
administration_models.ExtendedUsersModel.update_roles({'role_uuid': role['role_uuid']}, conn=conn)
administration_models.ExtendedUsersModel.update_sites({'site_uuid': payload['site_uuid']}, conn=conn)
SitesModel.SitesModel.delete_tuples((payload['site_uuid'],), conn=conn)
if self_conn:
conn.commit()
conn.close()
def addSite(payload, conn=None):
"""uses a Site Manager to add a site to the system
Args:
site_manager (MyDataclasses.SiteManager):
"""
self_conn = False
site_manager = SiteManager(
payload['site_name'],
payload['admin_user'],
payload['default_zone'],
payload['default_primary_location'],
payload['site_description']
)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
sql = 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";'
with conn.cursor() as cur:
cur.execute(sql)
site_manager.create_tables(conn=conn)
admin_user = administration_models.ExtendedUsersModel.add_admin_user(site_manager.admin_user, conn=conn)
site = SitesModel.SitesModel.Payload(
site_name=site_manager.site_name,
site_description=site_manager.description,
site_created_by=admin_user['user_uuid']
)
# have to build site table
site = SitesModel.SitesModel.insert_tuple(site.site_name, site.payload_dictionary(), conn=conn)
# have to build roles table
role = RolesModel.RolesModel.Payload(
role_name="Admin",
role_description=f"Admin for {site['site_name']}",
role_site_uuid=site['site_uuid']
)
role = RolesModel.RolesModel.insert_tuple(site['site_name'], role.payload_dictionary(), conn=conn)
# have to build logins table
payload = {
'user_uuid': admin_user['user_uuid'],
'site_uuid': site['site_uuid'],
'role_uuid': role['role_uuid']
}
admin_user = administration_models.ExtendedUsersModel.update_user_site_roles(payload, conn=conn)
default_zone = ZonesModel.ZonesModel.Payload(zone_name=site_manager.default_zone)
default_zone = ZonesModel.ZonesModel.insert_tuple(site["site_name"], default_zone.payload_dictionary(), conn=conn)
uuid = f"{site_manager.default_zone}@{site_manager.default_location}"
default_location = LocationsModel.LocationsModel.Payload(
location_shortname=uuid,
location_name=site_manager.default_location,
zone_uuid=default_zone['zone_uuid']
)
default_location = LocationsModel.LocationsModel.insert_tuple(site['site_name'], default_location.payload_dictionary(), conn=conn)
payload = {
'key': site['site_uuid'],
'update': {
'site_default_zone_uuid': default_zone['zone_uuid'],
'site_default_auto_issue_location_uuid': default_location['location_uuid'],
'site_default_primary_location_uuid': default_location['location_uuid']
}
}
SitesModel.SitesModel.update_tuple(payload, conn=conn)
blank_vendor = VendorsModel.VendorsModel.Payload("None", admin_user['user_uuid'])
blank_brand = BrandsModel.BrandsModel.Payload("None")
VendorsModel.VendorsModel.insert_tuple(site['site_name'], blank_vendor.payload_dictionary(), conn=conn)
BrandsModel.BrandsModel.insert_tuple(site['site_name'], blank_brand.payload_dictionary(), conn=conn)
if self_conn:
conn.commit()
conn.close()
except Exception as error:
with open("logs/process.log", "a+") as file:
file.write(f"{datetime.datetime.now()} --- ERROR --- {error}\n")
conn.rollback()
raise error

View File

@ -204,7 +204,7 @@
payload: payload payload: payload
}), }),
}); });
location.href = '/administration' //location.href = '/administration'
} }
async function postEditSite(){ async function postEditSite(){

View File

@ -1,5 +1,6 @@
from abc import ABC from abc import ABC
import psycopg2 import psycopg2
import psycopg2.extras
import datetime import datetime
import uuid import uuid
import json import json
@ -9,7 +10,6 @@ from copy import deepcopy
import config import config
def validateUUID(uuid_string, version): def validateUUID(uuid_string, version):
try: try:
u = uuid.UUID(uuid_string, version=version) u = uuid.UUID(uuid_string, version=version)
@ -29,8 +29,8 @@ class DatabaseError(Exception):
with open("logs/database.log", "a+") as file: with open("logs/database.log", "a+") as file:
file.write("\n") file.write("\n")
file.write(f"{datetime.datetime.now()} --- ERROR --- DatabaseError(message='{self.message}',\n") file.write(f"{datetime.datetime.now()} --- ERROR --- DatabaseError(message='{self.message}',\n")
file.write(f"{" "*41}payload={self.payload},\n") file.write(f"{' '*41}payload={self.payload},\n")
file.write(f"{" "*41}sql='{self.sql}')") file.write(f"{' '*41}sql='{self.sql}')")
def __str__(self): def __str__(self):
return f"DatabaseError(message='{self.message}', payload={self.payload}, sql='{self.sql}')" return f"DatabaseError(message='{self.message}', payload={self.payload}, sql='{self.sql}')"
@ -58,6 +58,10 @@ def getUUID(n):
class BasePayload(ABC): class BasePayload(ABC):
"""BasePayloads holds the bare minimum methods required of a Payload. """ """BasePayloads holds the bare minimum methods required of a Payload. """
def __repr__(self):
return self.__dict__
def payload_dictionary(self): def payload_dictionary(self):
return deepcopy(self.__dict__) return deepcopy(self.__dict__)
@ -76,6 +80,8 @@ class BaseModel(ABC):
""" """
table_name: str = None # All extended class must assign a table name that CRUD uses to call upon table_name: str = None # All extended class must assign a table name that CRUD uses to call upon
primary_key: str = 'id' # All extended class can assign a different primary key/cloumn which is used to call delete and update queries on. primary_key: str = 'id' # All extended class can assign a different primary key/cloumn which is used to call delete and update queries on.
primary_key_type: str = 'int'
site_agnostic: bool = False #all extended class can set this to true to avoid site injection
def __init_subclass__(cls, **kwargs): def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs) super().__init_subclass__(**kwargs)
@ -225,3 +231,104 @@ class BaseModel(ABC):
except Exception as error: except Exception as error:
raise DatabaseError(error, payload, sql) raise DatabaseError(error, payload, sql)
@classmethod
def select_tuple(self, site: str, payload: dict, convert: bool = True, conn=None):
''' payload = {'key': value_to_filter}'''
record = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
else:
sql = f"SELECT * FROM {site}_{self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_tuples_by_key(self, site: str, payload: dict, convert: bool = True, conn=None):
'''payload = {'key'}'''
records = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
else:
sql = f"SELECT * FROM {site}_{self.table_name} WHERE {self.primary_key} = %(key)s::{self.primary_key_type};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
records = rows
if self_conn:
conn.commit()
conn.close()
return records
except Exception as error:
raise DatabaseError(error, {}, sql)
@classmethod
def select_tuples(self, site: str, convert: bool = True, conn=None):
records = ()
self_conn = False
if self.site_agnostic:
sql = f"SELECT * FROM {self.table_name};"
else:
sql = f"SELECT * FROM {site}_{self.table_name};"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
if rows and convert:
records = [tupleDictionaryFactory(cur.description, row) for row in rows]
elif rows and not convert:
records = rows
if self_conn:
conn.commit()
conn.close()
return records
except Exception as error:
raise DatabaseError(error, {}, sql)

View File

@ -7,5 +7,5 @@ class BrandsModel(BaseModel):
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
name: str brand_name: str

View File

@ -1,17 +1,93 @@
from dataclasses import dataclass from dataclasses import dataclass
import datetime import datetime
from application.database_postgres.BaseModel import BasePayload, BaseModel import config
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, tupleDictionaryFactory, DatabaseError, updateStringFactory
class CostLayersModel(BaseModel): class CostLayersModel(BaseModel):
table_name = "cost_layers" table_name = "cost_layers"
primary_key = "item_location_uuid"
primary_key_type = "uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
aquisition_date: datetime.datetime item_location_uuid: str
quantity: float layer_aquisition_date: datetime.datetime
cost: float layer_quantity: float
currency_type: str layer_cost: float
vendor: int = 0 layer_currency_type: str
expires: datetime.datetime = None layer_vendor: str = None
layer_expires: datetime.datetime = None
@classmethod
def delete_by_layer_id(self, site: str, payload: tuple, convert: bool = True, conn=None):
""" Pass a tuple of layer_ids to remove from the database.
Args:
site (str): name of the site to delete from
payload (tuple): a tuple of layer_ids
convert (bool, optional): whether to return the deleted rows as dictionaries. Defaults to True.
conn (_type_, optional): postgresql connector object. Defaults to None.
Raises:
DatabaseError: raised for all errors with database handling, logs to database.log
Returns:
dict, list: returns a list of all deleted rows.
"""
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {site}_{self.table_name} WHERE layer_id IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def update_by_layer_id(self, site: str, payload:dict, convert=True, conn=None):
updated = ()
self_conn = False
set_clause, values = updateStringFactory(payload['update'])
values.append(payload['key'])
sql = f"UPDATE {site}_{self.table_name} SET {set_clause} WHERE layer_id=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -5,20 +5,23 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2
class FoodInfoModel(BaseModel): class FoodInfoModel(BaseModel):
table_name = "food_info" table_name = "food_info"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
food_groups: list = field(default_factory=list) item_uuid: str
ingrediants: list = field(default_factory=list) item_food_groups: list = field(default_factory=list)
nutrients: dict = field(default_factory=dict) item_ingredients: list = field(default_factory=list)
expires: bool = False item_nutrients: dict = field(default_factory=dict)
default_expiration: float = 0.0 item_expires: bool = False
item_default_expiration: float = 0.0
def payload_dictionary(self): def payload_dictionary(self):
return { payload = super().payload_dictionary()
'food_groups': lst2pgarr(self.food_groups), payload['item_food_groups'] = lst2pgarr(self.item_food_groups)
'ingrediants': lst2pgarr(self.ingrediants), payload['item_ingredients'] = lst2pgarr(self.item_ingredients)
'nutrients': json.dumps(self.nutrients), payload['item_nutrients'] = json.dumps(self.item_nutrients)
'expires': self.expires, return payload
'default_expiration': self.default_expiration
}

View File

@ -5,20 +5,22 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2
class ItemInfoModel(BaseModel): class ItemInfoModel(BaseModel):
table_name = "item_info" table_name = "item_info"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
barcode: str item_uuid: str
packaging: str = "" item_uom: str = None
uom_quantity: float = 1.0 item_packaging: str = ""
uom: int = 1 item_uom_quantity: float = 1.0
cost: float = 0.0 item_cost: float = 0.0
safety_stock: float = 0.0 item_safety_stock: float = 0.0
lead_time_days: float = 0.0 item_lead_time_days: float = 0.0
ai_pick: bool = False item_ai_pick: bool = False
prefixes: list = field(default_factory=list) item_prefixes: list = field(default_factory=list)
def payload_dictionary(self): def payload_dictionary(self):
payload = super().payload_dictionary() payload = super().payload_dictionary()
payload['prefixes'] = lst2pgarr(self.prefixes) payload['item_prefixes'] = lst2pgarr(self.item_prefixes)
return payload return payload

View File

@ -1,18 +1,46 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
import config
import psycopg2
class ItemLocationsModel(BaseModel): class ItemLocationsModel(BaseModel):
table_name = "item_locations" table_name = "item_locations"
primary_key = "item_location_uuid"
primary_key_type = 'uuid'
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
part_id: int item_uuid: str
location_id: int location_uuid: str
quantity_on_hand: float = 0.0 item_quantity_on_hand: float = 0.0
cost_layers: list = field(default_factory=list)
@classmethod
def select_by_location_and_item(self, site:str, payload:dict, convert: bool=True, conn = None):
recordset = ()
self_conn = False
sql = f"SELECT * FROM {site}_item_locations WHERE item_uuid = %(item_uuid)s AND location_uuid = %(location_uuid)s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
def payload_dictionary(self): with conn.cursor() as cur:
payload = super().payload_dictionary() cur.execute(sql, payload)
payload['cost_layers'] = lst2pgarr(self.cost_layers) rows = cur.fetchone()
return payload if rows and convert:
recordset = tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
recordset = rows
if self_conn:
conn.close()
return recordset
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -1,31 +1,135 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
import json import json
import psycopg2
import datetime
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, DatabaseError, tupleDictionaryFactory
import config
class ItemsModel(BaseModel): class ItemsModel(BaseModel):
table_name = "items" table_name = "items"
primary_key = "item_uuid"
primary_key_type = "uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
item_info_id: int item_category: str
item_info_uuid: str item_name: str
logistics_info_id: int item_created_at: datetime.datetime = field(init=False)
logistics_info_uuid: str item_updated_at: datetime.datetime = field(init=False)
food_info_id: int item_description: str = ""
food_info_uuid: str item_tags: list = field(default_factory=list)
barcode: str = "" item_links: dict = field(default_factory=dict)
item_name: str = "" item_brand_uuid: str = None
brand: int = 0 item_search_string: str = ""
description: str = "" item_inactive: bool = False
tags: list = field(default_factory=list)
links: dict = field(default_factory=dict) def __post_init__(self):
row_type: str = "" self.item_created_at = datetime.datetime.now()
item_type: str = "" self.item_updated_at = datetime.datetime.now()
search_string: str =""
def payload_dictionary(self): def payload_dictionary(self):
payload = super().payload_dictionary() payload = super().payload_dictionary()
payload['tags'] = lst2pgarr(self.tags) payload['item_tags'] = lst2pgarr(self.item_tags)
payload['links'] = json.dumps(self.links) payload['item_links'] = json.dumps(self.item_links)
return payload return payload
@classmethod
def get_item_by_uuid(self, site:str, payload: dict, convert: bool=True, conn = None):
record = ()
self_conn = False
with open('application/database_postgres/sql/ItemsModel/getItemAllByUUID.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site)
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
if rows and not convert:
record = rows
if self_conn:
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def paginate_items_with_qoh(self, site:str, payload: dict, convert: bool=True, conn = None):
recordset = ()
count = 0
self_conn = False
with open('application/database_postgres/sql/ItemsModel/paginateItemsWithQOH.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload['sort_order'])
sql_count = f"SELECT COUNT(*) FROM {site}_{self.table_name} items WHERE items.item_search_string LIKE '%%' || %(search_string)s || '%%';"
recordset = ()
count = 0
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
recordset = rows
cur.execute(sql_count, payload)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def paginate_items_for_modal(self, site:str, payload: dict, convert: bool=True, conn = None):
recordset = ()
count = 0
self_conn = False
with open('application/database_postgres/sql/ItemsModel/paginateItemsForModal.sql', 'r+') as file:
sql = file.read().replace("%%site_name%%", site).replace("%%sort_order%%", payload['sort_order'])
sql_count = f"SELECT COUNT(*) FROM {site}_{self.table_name} items WHERE items.item_search_string LIKE '%%' || %(search_string)s || '%%';"
recordset = ()
count = 0
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
recordset = [tupleDictionaryFactory(cur.description, row) for row in rows]
if rows and not convert:
recordset = rows
cur.execute(sql_count, payload)
count = cur.fetchone()[0]
if self_conn:
conn.close()
return recordset, count
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -7,7 +7,8 @@ class LocationsModel(BaseModel):
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
uuid: str location_shortname: str
name: str location_name: str
zone_id: int zone_uuid: str

View File

@ -4,12 +4,14 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel
class LogisticsInfoModel(BaseModel): class LogisticsInfoModel(BaseModel):
table_name = "logistics_info" table_name = "logistics_info"
primary_key_type = "uuid"
primary_key = "item_uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
barcode: str item_uuid: str
primary_location: int item_primary_location: str = None
primary_zone: int item_primary_zone: str = None
auto_issue_location: int item_auto_issue_location: str = None
auto_issue_zone: int item_auto_issue_zone: str = None

View File

@ -0,0 +1,50 @@
from dataclasses import dataclass, field
import json
import config
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, DatabaseError, tupleDictionaryFactory
class RolesModel(BaseModel):
table_name = "roles"
primary_key = "role_uuid"
@dataclass
class Payload(BasePayload):
role_name: str
role_description: str
role_site_uuid: str
role_flags: dict = field(default_factory=dict)
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['role_flags'] = json.dumps(self.role_flags)
return payload
@classmethod
def delete_tuples(self, payload: tuple, convert: bool = True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {self.table_name} WHERE {self.primary_key} IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -0,0 +1,121 @@
from dataclasses import dataclass, field
import json
import datetime
import psycopg2
from application.database_postgres.BaseModel import (
BasePayload, BaseModel, tupleDictionaryFactory, DatabaseError, updateStringFactory
)
import config
class SitesModel(BaseModel):
table_name = "sites"
primary_key = "site_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
site_name: str
site_description: str
site_created_by: str
site_default_zone_uuid: str = None
site_default_auto_issue_location_uuid: str = None
site_default_primary_location_uuid: str = None
site_created_on: datetime.datetime = field(init=False)
site_flags: dict = field(default_factory=dict)
def __post_init__(self):
self.site_created_on = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['site_flags'] = json.dumps(self.site_flags)
return payload
@classmethod
def delete_tuples(self, payload: tuple, convert: bool = True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {self.table_name} WHERE {self.primary_key} IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def update_tuple(self, payload: dict, convert=True, conn=None):
""" payload (dict): {'key': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = updateStringFactory(payload['update'])
values.append(payload['key'])
sql = f"UPDATE {self.table_name} SET {set_clause} WHERE {self.primary_key}=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_all(self, payload: dict, convert=True, conn=None):
record = ()
self_conn = False
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key}=%(key)s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -6,20 +6,25 @@ from application.database_postgres.BaseModel import BasePayload, BaseModel
class TransactionsModel(BaseModel): class TransactionsModel(BaseModel):
table_name = "transactions" table_name = "transactions"
primary_key = "transaction_uuid"
primary_key_type = "uuid"
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
timestamp: datetime.datetime item_uuid: str
logistics_info_id: int transaction_created_by: str
barcode: str transaction_name: str
name: str
transaction_type: str transaction_type: str
quantity: float transaction_created_at: datetime.datetime = field(init=False)
description: str transaction_quantity: float = 0.00
user_id: int transaction_description: str = ''
data: dict = field(default_factory=dict) transaction_cost: float = 0.00
transaction_data: dict = field(default_factory=dict)
def __post_init__(self):
self.transaction_created_at = datetime.datetime.now()
def payload_dictionary(self): def payload_dictionary(self):
payload = super().payload_dictionary() payload = super().payload_dictionary()
payload['data'] = json.dumps(self.data) payload['transaction_data'] = json.dumps(self.transaction_data)
return payload return payload

View File

@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
class UnitsModel(BaseModel):
table_name = "units"
primary_key = "units_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
unit_plural:str
unit_single:str
unit_fullname: str
unit_description: str

View File

@ -0,0 +1,82 @@
from dataclasses import dataclass, field
import json
import datetime
import psycopg2
from application.database_postgres.BaseModel import BasePayload, BaseModel, lst2pgarr, tupleDictionaryFactory, DatabaseError
import config
class UsersModel(BaseModel):
table_name = "users"
primary_key = "user_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
user_name:str
user_password:str
user_email: str
user_flags: dict = field(default_factory=dict)
user_favorites: dict = field(default_factory=dict)
user_sites: list = field(default_factory=list)
user_roles: list = field(default_factory=list)
user_is_system_admin: bool = False
user_row_type: str = "user"
user_profile_pic_url: str = ""
user_login_type: str = "Internal"
user_joined_on: datetime.datetime = field(init=False)
def __post_init__(self):
self.creation_date = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['user_flags'] = json.dumps(self.user_flags)
payload['user_favorites'] = json.dumps(self.user_favorites)
payload['user_sites'] = lst2pgarr(self.user_sites)
payload['user_roles'] = lst2pgarr(self.user_roles)
return payload
@staticmethod
def washUserDictionary(user):
return {
'user_uuid': user['user_uuid'],
'user_name': user['user_name'],
'user_sites': user['user_sites'],
'user_roles': user['user_roles'],
'user_is_system_admin': user['user_is_system_admin'],
'user_flags': user['user_flags'],
'user_profile_pic_url': user['user_profile_pic_url'],
'user_login_type': user['user_login_type']
}
@classmethod
def select_tuple_by_username(self, payload: dict, convert: bool = True, conn=None):
record = ()
self_conn = False
sql = f"SELECT * FROM {self.table_name} WHERE user_name = %(key)s"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)

View File

@ -9,10 +9,10 @@ class VendorsModel(BaseModel):
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
vendor_name: str vendor_name: str
created_by: int vendor_created_by: str
vendor_address: str = "" vendor_address: str = ""
creation_date: datetime.datetime = field(init=False) vendor_creation_date: datetime.datetime = field(init=False)
phone_number: str = "" vendor_phone_number: str = ""
def __post_init__(self): def __post_init__(self):
self.creation_date = datetime.datetime.now() self.vendor_creation_date = datetime.datetime.now()

View File

@ -7,6 +7,6 @@ class ZonesModel(BaseModel):
@dataclass @dataclass
class Payload(BasePayload): class Payload(BasePayload):
name: str zone_name: str
description: str = "" zone_description: str = ""

Some files were not shown because too many files have changed in this diff Show More