2025-08-28 17:58:05 -05:00

121 lines
4.2 KiB
Python

from dataclasses import dataclass, field
import json
import datetime
import psycopg2
from application.database_postgres.BaseModel import (
BasePayload, BaseModel, tupleDictionaryFactory, DatabaseError, updateStringFactory
)
import config
class SitesModel(BaseModel):
table_name = "sites"
primary_key = "site_uuid"
primary_key_type = "uuid"
site_agnostic = True
@dataclass
class Payload(BasePayload):
site_name: str
site_description: str
site_created_by: str
site_default_zone_uuid: str = None
site_default_auto_issue_location_uuid: str = None
site_default_primary_location_uuid: str = None
site_created_on: datetime.datetime = field(init=False)
site_flags: dict = field(default_factory=dict)
def __post_init__(self):
self.site_created_on = datetime.datetime.now()
def payload_dictionary(self):
payload = super().payload_dictionary()
payload['site_flags'] = json.dumps(self.site_flags)
return payload
@classmethod
def delete_tuples(self, payload: tuple, convert: bool = True, conn=None):
deleted = ()
self_conn = False
sql = f"WITH deleted_rows AS (DELETE FROM {self.table_name} WHERE {self.primary_key} IN ({','.join(['%s'] * len(payload))}) RETURNING *) SELECT * FROM deleted_rows;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = True
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchall()
if rows and convert:
deleted = [tupleDictionaryFactory(cur.description, r) for r in rows]
elif rows and not convert:
deleted = rows
if self_conn:
conn.commit()
conn.close()
return deleted
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def update_tuple(self, payload: dict, convert=True, conn=None):
""" payload (dict): {'key': row_id, 'update': {... column_to_update: value_to_update_to...}} """
updated = ()
self_conn = False
set_clause, values = updateStringFactory(payload['update'])
values.append(payload['key'])
sql = f"UPDATE {self.table_name} SET {set_clause} WHERE {self.primary_key}=%s RETURNING *;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, values)
rows = cur.fetchone()
if rows and convert:
updated = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
updated = rows
if self_conn:
conn.commit()
conn.close()
return updated
except Exception as error:
raise DatabaseError(error, payload, sql)
@classmethod
def select_all(self, payload: dict, convert=True, conn=None):
record = ()
self_conn = False
sql = f"SELECT * FROM {self.table_name} WHERE {self.primary_key}=%(key)s;"
try:
if not conn:
database_config = config.config()
conn = psycopg2.connect(**database_config)
conn.autocommit = False
self_conn = True
with conn.cursor() as cur:
cur.execute(sql, payload)
rows = cur.fetchone()
if rows and convert:
record = tupleDictionaryFactory(cur.description, rows)
elif rows and not convert:
record = rows
if self_conn:
conn.commit()
conn.close()
return record
except Exception as error:
raise DatabaseError(error, payload, sql)