first commit

This commit is contained in:
Jadowyne Ulve 2025-09-06 19:07:26 -05:00
parent b148eecace
commit 2ec55e1035
39 changed files with 2163 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
build/
dist/
avatars/
log.txt
settings.json
test.json
test.py
test.txt

View File

@ -0,0 +1,11 @@
{
"folders": [
{
"path": "."
},
{
"path": "../../../My Code/website"
}
],
"settings": {}
}

552
MainGui.py Normal file
View File

@ -0,0 +1,552 @@
from PyQt5.QtCore import Qt
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import (QMainWindow, QWidget, QToolBar, QAction,
QSizePolicy, QToolButton, QTreeWidget,
QListWidget, QListWidgetItem, QTreeWidgetItem, QGroupBox, QLineEdit,
QLabel, QTextEdit, QPushButton, QFileDialog, QSlider, QHBoxLayout, QVBoxLayout, QFrame, QGridLayout, QComboBox, QFontDialog, QTextBrowser)
from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtGui import QIcon, QFont
from qtwidgets import Toggle
from striprtf.striprtf import rtf_to_text
from cloud_backend import LoginDialog, login, upload_file
import tempfile
import export_process, import_process, upload_process
import settings, pathlib, os, sys, icons, easySQL, tables, json, zipfile
global status_bar
global tool_bar
class ModificationItem(QTreeWidgetItem):
def __init__(self):
super().__init__()
self.id = 0
class ColectionItem(QListWidgetItem):
def __init__(self):
super().__init__()
self.id = 0
class MainGui(QMainWindow):
def __init__(self) -> None:
self.app_settings = settings.app_settings
super().__init__()
self.setWindowTitle(self.app_settings.get_setting('window_title'))
self.setMaximumHeight(720)
self.setMaximumWidth(1280)
self.resize(self.app_settings.get_setting('window_width'), self.app_settings.get_setting('window_height'))
self.setWindowIcon(QIcon(icons.window_icon))
self.widget = QWidget()
self.tool_bar = QToolBar()
self.addToolBar(self.tool_bar)
self.tool_bar.setMovable(False)
self.tool_bar.setFixedHeight(35)
self.exit_app = QAction("Save and exit", self)
self.exit_app.setIcon(QIcon(icons.exit_icon))
self.exit_app.triggered.connect(self.exit_application)
self.tool_bar.addAction(self.exit_app)
self.tool_bar.addSeparator()
# need an import button that imports a selected pcmp from a file dialog
self.import_tool = QAction("Import PCMP File", self)
self.import_tool.setToolTip("Imports a PCMP file from system into your Penumbra installation.")
self.import_tool.setIcon(QIcon(icons.import_icon))
self.import_tool.triggered.connect(self.import_collection)
self.tool_bar.addAction(self.import_tool)
# need an export button that exports a selected collection in the list
self.export_tool = QAction("Export Collection to Archive", self)
self.export_tool.setToolTip("Export collection from Penumbra installation into a share-able PCMP archive")
self.export_tool.setIcon(QIcon(icons.export_icon))
self.export_tool.setEnabled(False)
self.export_tool.triggered.connect(self.export_collection)
self.tool_bar.addAction(self.export_tool)
# need an upload dummy button for future features
self.upload_tool = QAction("Upload Collection to the set up server.", self)
self.upload_tool.setToolTip("If a server has been configured alongside a profile on that server, you may upload the collection straight there for others to download without needing to share it.")
self.upload_tool.setIcon(QIcon(icons.upload_icon))
self.upload_tool.setEnabled(False)
self.upload_tool.triggered.connect(self.upload_collection)
self.tool_bar.addAction(self.upload_tool)
self.refresh_tool = QAction("Refresh", self)
self.refresh_tool.setToolTip("Refreshes the loaded data and selected collection data!")
self.refresh_tool.setIcon(QIcon(icons.refresh))
self.refresh_tool.setEnabled(True)
self.tool_bar.addAction(self.refresh_tool)
#add a spacer
self.spacer = QWidget()
self.spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.tool_bar.addWidget(self.spacer)
self.mode_label = QLabel()
self.mode_label.setText('Mode')
self.tool_bar.addWidget(self.mode_label)
self.mode_slider = Toggle()
self.mode_slider.setToolTip('Sets the mode between basic or advanced details allowing for more manipulation of imports and exports. Only use this if you know what you are doing!')
self.mode_slider.toggled.connect(self.mode_change)
self.tool_bar.addWidget(self.mode_slider)
self.settings_button = QAction("Settings", self)
self.settings_button.setIcon(QIcon(icons.configure))
self.tool_bar.addAction(self.settings_button)
self.settings_button.triggered.connect(self.change_font_size)
self.help_button = QAction("Help", self)
self.help_button.setIcon(QIcon(icons.help_icon))
self.tool_bar.addAction(self.help_button)
# need a database connection indicator
self.database_indicator = QAction("Database Status", self)
self.database_indicator.setIcon(QIcon(icons.database_bad))
self.tool_bar.addAction(self.database_indicator)
# need a sign-in/profile placard that acts as a button to change login info
self.profile_indicator = QToolButton()
self.profile_indicator.setText("Profile/Login")
self.profile_indicator.setToolTip("If a server is configured use this to attempt so sign in to a profile on that server. This profile will be used for when uploading a collection.")
self.profile_indicator.setIcon(QIcon(icons.profile))
self.profile_indicator.setToolButtonStyle(Qt.ToolButtonStyle.ToolButtonTextBesideIcon)
self.profile_indicator.clicked.connect(self.login_user)
self.tool_bar.addWidget(self.profile_indicator)
# main vertical layout object
self.horizontal_layout = QHBoxLayout(self)
# collection layout V
self.collection_vertical_layout = QVBoxLayout()
# self.inner_canvas.setStyleSheet("background-color: blue")
self.collection_list = QListWidget()
self.col_frame = QFrame()
self.col_frame.setFrameStyle(QFrame.Panel)
self.collection_list.setFixedWidth(200)
self.collection_list.setFixedHeight(300)
self.collection_vertical_layout.addWidget(self.collection_list)
self.collection_vertical_layout.addWidget(self.col_frame)
# Close out collection Layout V
self.details_vertical_layout = QVBoxLayout()
self.details_group = QGroupBox()
self.details_group.setTitle('Collection Details')
self.details_horizontal_layout = QHBoxLayout()
self.horizontal_form_layout = QGridLayout()
# ---- Form Control Start ---->
self.name_label = QLabel()
self.name_label.setText("Collection Name")
self.name_edit = QLineEdit()
self.name_edit.setMaximumWidth(200)
self.version_label = QLabel()
self.version_label.setText("Version")
self.version_edit = QLineEdit()
self.version_edit.setMaximumWidth(200)
self.comment_label = QLabel()
self.comment_label.setText("Comments")
self.comment_edit = QTextEdit()
self.comment_edit.setMaximumHeight(150)
self.comment_edit.setMaximumHeight(250)
self.horizontal_form_layout.addWidget(self.name_label, 0, 0)
self.horizontal_form_layout.addWidget(self.name_edit, 0, 1)
self.horizontal_form_layout.addWidget(self.version_label, 1, 0)
self.horizontal_form_layout.addWidget(self.version_edit, 1, 1)
self.horizontal_form_layout.addWidget(self.comment_label, 2, 0)
self.horizontal_form_layout.addWidget(self.comment_edit, 2, 1)
# ---- Form Control End ---->
self.details_group.setLayout(self.horizontal_form_layout)
self.details_group.setFixedHeight(300)
self.details_group.setMaximumWidth(500)
self.character_links = QGroupBox()
self.character_links.setTitle("Character Links")
self.character_links.setFixedHeight(300)
self.character_links.setFixedWidth(200)
self.character_grid_layout = QGridLayout()
self.links_list = QListWidget()
self.character_grid_layout.addWidget(self.links_list)
self.character_links.setLayout(self.character_grid_layout)
self.search_horizontal_layout = QHBoxLayout()
self.clear_filter_mods = QPushButton()
self.clear_filter_mods.setIcon(QIcon(icons.clear_filter))
self.clear_filter_mods.setMaximumWidth(30)
self.clear_filter_mods.setMaximumHeight(30)
self.mod_list_search = QLineEdit()
self.mod_list_search.textEdited.connect(self.filter_mods)
self.mod_list_search.setFixedHeight(30)
self.mod_list_search.setFixedWidth(200)
self.search_horizontal_layout.addWidget(self.clear_filter_mods)
self.search_horizontal_layout.addWidget(self.mod_list_search)
self.search_horizontal_layout.setSpacing(5)
self.search_horizontal_layout.setAlignment(Qt.AlignLeft)
self.mod_list = QTreeWidget()
self.details_horizontal_layout.addWidget(self.details_group)
self.details_horizontal_layout.addWidget(self.character_links)
self.details_vertical_layout.addLayout(self.details_horizontal_layout)
self.details_vertical_layout.addLayout(self.search_horizontal_layout)
self.details_vertical_layout.addWidget(self.mod_list)
# ---- Details/Modlist Layout V ---- >
self.advanced_widget_canvas = QWidget()
self.advanced_widget_canvas.setLayout(self.details_vertical_layout)
self.horizontal_layout.addLayout(self.collection_vertical_layout)
self.basic_vertical_layout = QVBoxLayout()
self.basic_horizontal_layout = QHBoxLayout()
self.basic_vertical_layout.addLayout(self.basic_horizontal_layout)
self.basic_group = QGroupBox()
self.basic_group.setTitle("Basic Mode Instructions")
self.basic_horizontal_layout.addWidget(self.basic_group)
self.group_layout = QGridLayout()
self.basic_group.setLayout(self.group_layout)
self.label1 = QLabel()
#self.label1.setAcceptRichText(True)
self.label1.setText("hello World")
self.label1.setWordWrap(True)
self.label1.setMinimumWidth(400)
self.group_layout.addWidget(self.label1, 0, 0)
self.group_layout.setAlignment(Qt.AlignLeft)
self.group_layout.addWidget(self.name_edit, 1, 0)
self.basic_widget_canvas = QWidget()
self.basic_widget_canvas.setLayout(self.basic_vertical_layout)
# this is where the if split needs to happen
basic_mode = True
advanced_mode = False
self.horizontal_layout.addWidget(self.advanced_widget_canvas)
self.horizontal_layout.addWidget(self.basic_widget_canvas)
if advanced_mode:
self.basic_widget_canvas.setHidden(True)
self.advanced_widget_canvas.setHidden(False)
if basic_mode:
self.basic_widget_canvas.setHidden(False)
self.advanced_widget_canvas.setHidden(True)
self.widget.setLayout(self.horizontal_layout)
self.setCentralWidget(self.widget)
self.selected_item = None
self.selected_item_meta = {}
self.selected_item_id = 0
self.selected_item_data = {}
def change_font_size(self, a):
(font, ok) = QFontDialog.getFont(QFont("Helvetica [Cronyx]", 10), self)
if ok:
# the user clicked OK and font is set to the font the user selected
self.setFont(font)
def __post__init__(self):
self.collection_list.itemClicked.connect(self.populate_details)
if not settings.app_settings.get_setting("update_opt_in") or settings.app_settings.lock_client:
self.profile_indicator.setEnabled(False)
self.database_indicator.setEnabled(False)
self.repopulate_collections()
self.repopulate_modslist()
def exit_application(self):
self.app_settings.save_settings()
sys.exit()
def mode_change(self, a):
if a:
self.setWindowTitle(f"{self.app_settings.get_setting('window_title')} (Advanced Mode)")
self.app_settings.set_setting('advanced_mode', True)
self.basic_widget_canvas.setHidden(True)
self.advanced_widget_canvas.setHidden(False)
else:
self.setWindowTitle(f"{self.app_settings.get_setting('window_title')} (Basic Mode)")
self.app_settings.set_setting('advanced_mode', False)
self.advanced_widget_canvas.setHidden(True)
self.basic_widget_canvas.setHidden(False)
if not self.app_settings.get_setting('advanced_mode'):
self.version_edit.setDisabled(True)
self.comment_edit.setDisabled(True)
self.links_list.setDisabled(True)
self.mod_list_search.setDisabled(True)
self.clear_filter_mods.setDisabled(True)
else:
self.version_edit.setDisabled(False)
self.comment_edit.setDisabled(False)
self.links_list.setDisabled(False)
self.mod_list_search.setDisabled(False)
self.clear_filter_mods.setDisabled(False)
self.repopulate_modslist()
def get_export_upload_payload(self) -> dict:
collection_data = easySQL.fetchone_from_table(tables.collections, filter=('collection_id', self.selected_item_id))
collection_settings = json.loads(collection_data.settings)
# The below process handles the repackaging payload for a collection allowing for advanced changes such
# as enabling or putting a modification on ignore.
# TODO: enabled_mods and mods_to_copy could be consolidated into a single dictionary
enabled_mods = {}
mods_to_copy = []
count = self.mod_list.topLevelItemCount()
for index in range(count):
item = self.mod_list.topLevelItem(index)
if item.checkState(0) == 2:
modification_row = easySQL.fetchone_from_table(tables.modifications, filter=('name', item.text(0)))
mods_to_copy.append((modification_row.name, modification_row.mod_path, modification_row.total_files))
try:
mod_settings = collection_settings[item.text(0)]
mod_settings['Enabled'] = True
enabled_mods[item.text(0)] = mod_settings
except:
pass
payload: {str, any} = {
'collection_json':{
'Version': collection_data.version,
'Id': collection_data.uuid,
'Name': self.name_edit.text(),
'Settings': enabled_mods,
'Inheritance': []
},
'character_links': json.loads(collection_data.character_links),
'meta_data': {
"collection_name": self.name_edit.text(),
"collection_uuid": collection_data.uuid,
"version": self.version_edit.text(),
"comments": self.comment_edit.toPlainText()
},
'mods_to_copy': mods_to_copy
}
return payload
def upload_collection(self):
payload = self.get_export_upload_payload()
progress_dialog = upload_process.UploadProgressDialog(self, payload)
progress_dialog.start_upload()
def export_collection(self):
payload = self.get_export_upload_payload()
dialog = QFileDialog(self)
dialog.setFileMode(QFileDialog.DirectoryOnly)
if dialog.exec_():
save_directory = dialog.selectedFiles()[0]
payload['save_directory'] = save_directory
# todo: remove this
with open('test.json', 'w+') as file:
json.dump(payload, file, indent=2)
progress_dialog = export_process.ExportProgressDialog(self, payload)
progress_dialog.start_exportation()
def import_collection(self):
dialog = QFileDialog(self)
dialog.setFileMode(QFileDialog.AnyFile)
dialog.setNameFilter("Archive (*.pcmp)")
if dialog.exec_():
path_to_zip = pathlib.Path(dialog.selectedFiles()[0])
print(path_to_zip) # do some validation on the path
with zipfile.ZipFile(path_to_zip) as collection_pcmp:
collection_uuid = ''
with collection_pcmp.open("meta.json") as _file:
meta = json.load(_file)
collection_uuid = meta["collection_uuid"]
with collection_pcmp.open(f"{collection_uuid}.json") as _file:
collection_data = json.load(_file)
if collection_uuid in [row.uuid for row in easySQL.fetchall_from_table(tables.collections)]:
warning_dialog = QMessageBox(self)
warning_dialog.setWindowTitle('Collection already exists...')
warning_dialog.setText(f"It appears that this collection already exists, would you like to overwrite it?")
warning_dialog.setIcon(QMessageBox.Question)
warning_dialog.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
button = warning_dialog.exec()
if button == QMessageBox.No:
return
payload = {
'collection_json': collection_data,
'meta_data': meta,
'path_to_zip': path_to_zip,
'mods_to_copy': [key for key in collection_data['Settings'].keys()]
}
print(payload)
self.progress_dialog = import_process.ImportProgressDialog(self, payload)
self.progress_dialog.import_collection()
def login_user(self):
dialog = LoginDialog(self)
if dialog.exec_():
self.app_settings.user_data = dialog.user_data
self.app_settings.connected = True
self.profile_indicator.setIcon(QIcon(settings.app_settings.user_data['avatar']))
self.profile_indicator.setText(settings.app_settings.username)
self.database_indicator.setIcon(QIcon(icons.database_good))
def filter_mods(self):
filter = self.mod_list_search.text()
count = self.mod_list.topLevelItemCount()
for index in range(count):
item = self.mod_list.topLevelItem(index)
present_filter = [mod_name.strip() for mod_name in json.loads(self.selected_item_data.settings).keys()]
if filter in item.text(0) and item.text(0) in present_filter:
item.setHidden(False)
else:
item.setHidden(True)
def repopulate_modslist(self, enabled: list = [], present: list = []):
self.mod_list.clear()
self.mod_list.setHeaderLabel('Modification Name')
self.mod_list.setAlternatingRowColors(True)
list = easySQL.fetchall_from_table(tables.modifications)
for d, row in enumerate(list):
item = ModificationItem()
item.setText(0, row.name)
if row.name in enabled:
item.setCheckState(0, 2)
else:
item.setCheckState(0, 0)
item.id = row.mod_id
self.mod_list.addTopLevelItem(item)
if row.name in present or len(present)==0:
item.setHidden(False)
item.setDisabled(False)
else:
item.setHidden(True)
item.setDisabled(True)
if not self.app_settings.get_setting('advanced_mode'):
item.setDisabled(True)
def repopulate_collections(self):
self.collection_list.clear()
self.collection_list.setAlternatingRowColors(True)
list = [(row.collection_id, row.name) for row in easySQL.fetchall_from_table(tables.collections)]
for id, name in list:
item = ColectionItem()
item.setText(name)
item.id = id
item.setToolTip(f"{id}")
self.collection_list.addItem(item)
def repopulate_character_links(self):
self.links_list.clear()
links = json.loads(self.selected_item_data.character_links)
if links['Default']:
item = QListWidgetItem()
item.setText("Default")
item.setCheckState(2)
self.links_list.addItem(item)
if links["Yourself"]:
item = QListWidgetItem()
item.setText("Yourself")
item.setCheckState(2)
self.links_list.addItem(item)
if len(links["Individuals"]) > 0:
for link in links["Individuals"]:
item = QListWidgetItem()
item.setText(link["Display"])
item.setCheckState(2)
self.links_list.addItem(item)
def populate_details(self, item):
self.export_tool.setEnabled(True)
if self.app_settings.connected:
self.upload_tool.setEnabled(True)
else:
self.upload_tool.setEnabled(False)
self.selected_item_id = item.id
self.selected_item = item
self.selected_item_data = easySQL.fetchone_from_table(tables.collections, filter=("collection_id", item.id))
self.selected_item_meta = easySQL.fetchone_from_table(tables.metadata, filter=("collection_name", self.selected_item_data.name))
settings_filter = [mod_name.strip() for mod_name, settings in json.loads(self.selected_item_data.settings).items() if settings['Enabled']]
present_filter = [mod_name.strip() for mod_name in json.loads(self.selected_item_data.settings).keys()]
self.repopulate_modslist(enabled=settings_filter, present=present_filter)
self.repopulate_character_links()
self.name_edit.setText(self.selected_item_data.name)
self.version_edit.setText(self.selected_item_meta.version)
self.comment_edit.setPlainText(self.selected_item_meta.comments)
if not self.app_settings.get_setting('advanced_mode'):
self.version_edit.setDisabled(True)
self.comment_edit.setDisabled(True)
self.links_list.setDisabled(True)
self.mod_list_search.setDisabled(True)
self.clear_filter_mods.setDisabled(True)
self.mod_list.setDisabled(True)
else:
self.version_edit.setDisabled(False)
self.comment_edit.setDisabled(False)
self.links_list.setDisabled(False)
self.mod_list_search.setDisabled(False)
self.clear_filter_mods.setDisabled(False)
self.mod_list.setDisabled(False)

168
cloud_backend.py Normal file
View File

@ -0,0 +1,168 @@
from PyQt5.QtWidgets import QDialog, QGridLayout, QLineEdit, QPushButton, QLabel, QVBoxLayout, QRadioButton
import requests, json
from PIL import Image
from io import BytesIO
import settings
import pathlib, os
global app_settings
app_settings = settings.app_settings
class LoginDialog(QDialog):
def __init__(self, parent):
super().__init__(parent)
self.setWindowTitle('Login')
self.grid_layout = QGridLayout()
self.server_address = QLineEdit()
self.server_address.setText(settings.app_settings.connected_server)
self.server_address_label = QLabel()
self.server_address_label.setText("Server")
self.username = QLineEdit()
self.username.setText(app_settings.username)
self.username_label = QLabel()
self.username_label.setText("Username")
self.password = QLineEdit()
self.password.setText(app_settings.password)
self.password_label = QLabel()
self.password_label.setText("Password")
self.connect_button = QPushButton()
self.connect_button.setText("Connect")
self.connect_button.clicked.connect(self.__login)
self.grid_layout.addWidget(self.server_address_label, 0, 0)
self.grid_layout.addWidget(self.server_address, 0, 1)
self.grid_layout.addWidget(self.username_label, 1, 0)
self.grid_layout.addWidget(self.username, 1,1)
self.grid_layout.addWidget(self.password_label, 2, 0)
self.grid_layout.addWidget(self.password, 2, 1)
self.grid_layout.addWidget(self.connect_button, 3, 1)
self.setLayout(self.grid_layout)
self.setFixedSize(300, 200)
self.user_data = {}
self.show()
def __login(self):
settings.app_settings.username = self.username.text()
settings.app_settings.password = self.password.text()
settings.app_settings.connected_server = self.server_address.text()
user_data = login(settings.app_settings.username, settings.app_settings.password)
if user_data != False:
self.user_data = user_data
with open('log.txt', 'w+') as file:
file.write(json.dumps(user_data))
self.accept()
def ping(url):
request = requests.get(url)
if request.status_code == 200:
return True
return False
def upload_file(filename, file_path, external_data: dict):
external_data['username'] = settings.app_settings.username
external_data['password'] = settings.app_settings.password
external_data['filename'] = filename
files = [
('file', ('file', open(file_path, 'rb'), 'application/octet')),
('datas', ('datas', json.dumps(external_data), 'application/json')),
]
r = requests.post(f"{settings.app_settings.connected_server}/upload_app", files=files)
def login(username, password):
headers = {'Content-Type': 'application/json'}
data = {
"username": username,
"password": password
}
response = requests.post(f"{settings.app_settings.connected_server}/login_app", headers=headers, json=data)
user_data = {}
if response.status_code == 200:
user_data = response.json()
settings.app_settings.username = username
settings.app_settings.password = password
response = requests.post(f"{settings.app_settings.connected_server}/login_app/avatar", json=data)
if response.status_code == 200:
im = Image.open(BytesIO(response.content))
path = settings.app_settings.external_path / "avatars" / f"{data['username']}_avatar.png"
print(path)
im.save(path)
user_data['avatar'] = str(path.absolute())
return user_data
user_data['avatar'] = f"icons/avatars/default_avatar.png"
return user_data
return False
class UpdateOptInDialog(QDialog):
def __init__(self, parent, default_pipeline):
super().__init__(parent)
self.default_pipeline = default_pipeline
self.setWindowTitle(f"Connectivity Opt In")
self.setMaximumWidth(300)
self.resize(300, 100)
self.vertical_layout = QVBoxLayout()
text = """This seems to be your first time loading this app and we have a important question to ask. That is; do you wish to opt into receiving update requests from our pipelines? If you choose not to then you will not be able to connect to any servers. This is in order to ensure that you stay upto date for those servers."""
self.message_label = QLabel()
self.message_label.setText(text)
self.message_label.setWordWrap(True)
self.message_label.setFixedSize(400, 100)
self.opt_in = QRadioButton()
self.opt_in.setText("I Want to Opt In to Updates")
self.opt_in.toggled.connect(self.radio_changed)
self.pipeline_edit = QLineEdit()
self.pipeline_edit.setEnabled(False)
self.confirm_button = QPushButton()
self.confirm_button.setText("Confirm")
self.confirm_button.setMaximumWidth(60)
self.confirm_button.setMaximumHeight(30)
self.confirm_button.clicked.connect(self.confirm_selection)
self.vertical_layout.addWidget(self.message_label)
self.vertical_layout.addWidget(self.opt_in)
self.vertical_layout.addWidget(self.pipeline_edit)
self.vertical_layout.addWidget(self.confirm_button)
self.setLayout(self.vertical_layout)
self.show()
def radio_changed(self, state):
if state:
self.pipeline_edit.setEnabled(True)
self.pipeline_edit.setText(self.default_pipeline)
if not state:
self.pipeline_edit.setEnabled(False)
self.pipeline_edit.setText("")
def get_opt_in_state(self):
return self.opt_in.isChecked()
def get_pipeline(self):
return self.pipeline_edit.text()
def confirm_selection(self):
self.accept()

BIN
database.sqlite Normal file

Binary file not shown.

279
easySQL.py Normal file
View File

@ -0,0 +1,279 @@
from typing import Any
import sqlite3, pathlib
from collections import namedtuple
STRING = 'string'
INTEGER = 'integer'
UNIQUE = 'UNIQUE'
JSON = 'json' # TODO: create a function for converting lists and dict into json string and back
database = None
def VALDATED_STRING(pointer, direction=0):
"""
direction(0) = into database
direction(1) = out of database
"""
if direction == 0:
return pointer.replace("'", "%^&*").strip()
elif direction == 1:
return pointer.replace("%^&*", "'").strip()
def intergrate(database_path: pathlib.Path = None) -> sqlite3.Connection:
if not database_path:
database_path = pathlib.Path("test.sqlite")
global database
database = sqlite3.connect(database=database_path.absolute())
return database
def Table(cls):
""" easySQL decorator for table classes for easy instantiation of many of the SQL_execute strings.
This class will always need these variables defined within its __init__ method;
self.name = "foo"; This will be the name of the table in the integrated database
self.columns = {foo: dah, ...}; dictionary of foo being the column name, and dah being the columns type in the database
types for a column are:
- STRING
- INTEGER
- UNIQuE
- JSON
Returns:
Table: returns a Table class wrapped around the original class.
"""
class Table(cls):
def __init__(self, *args, **kwargs) -> None:
super(Table, self).__init__(*args, **kwargs)
self.data_object = namedtuple(f"{self.name}_row", list(self.columns.keys()))
self.columns_validation = len(self.columns)
def __repr__(self):
return f"{self.__class__.__name__} ('{self.name}')"
@property
def create_table(self):
def manufacture_create_SQL_string() -> str:
""" Takes the super()'s columns dictionary and bulds parts of the SQL_execute string.
Returns:
str: middle of create table SQL_execute string.
"""
# TODO: very crude way of doing it, research a better way.
middle_string = 'id integer PRIMARY KEY, '
current_count = 0
for column_name, column_type in self.columns.items():
if column_type == 'json':
column_type == 'string'
if current_count == len(self.columns.items())-1:
middle_string += f"{column_name} {column_type}"
else:
middle_string += f"{column_name} {column_type}, "
current_count += 1
return middle_string
return f"CREATE TABLE {self.name} ({manufacture_create_SQL_string()});"
@property
def drop_table(self):
return f"DROP TABLE {self.name};"
def select_row(self, column: str = None, match = None):
if column:
column_type = self.columns[column]
if column_type == "string": match = VALDATED_STRING(match, direction=0)
return f"SELECT * FROM {self.name} WHERE {column}= '{match}'"
else:
return f"SELECT * FROM {self.name}"
def insert_row(self, data) -> namedtuple:
__conversion__ = []
for id, pointer in enumerate(data):
column_type = list(self.columns.values())[id]
if isinstance(pointer, str) and column_type != "json":
pointer = VALDATED_STRING(pointer, direction=0)
__conversion__.append(pointer)
__conversion__ = tuple(__conversion__)
def manufacture_insert_SQL_String():
middle_string = '('
gavel_string = '('
current_count = 0
for column_name in self.columns.keys():
if current_count == len(self.columns.items())-1:
middle_string += f"{column_name})"
gavel_string += f"?)"
else:
middle_string += f"{column_name}, "
gavel_string += f"?, "
current_count += 1
return f"{middle_string} VALUES {gavel_string}"
query = namedtuple('Query', ['query', 'data'])
if len(__conversion__) == self.columns_validation:
return query(query=f"INSERT INTO {self.name}{manufacture_insert_SQL_String()}", data=__conversion__)
else:
return query(query=False, data= f"passed data to {self.name} is not the right length of entries")
def update_row_by_id(self, data: dict, id: str):
""" Update a row at {id} with {data}.
Args:
data (dict): key = column, value = data to update to
id (str): row_id in Table
"""
def manufactur_update_SQL_string(data: dict) -> str:
""" takes data and builds a SQL_execute string segment
Args:
data (dict): Key = column, value = data to update to
Returns:
_type_: middle segment of SQL_execute string
"""
# TODO: this is a very crude implementtion
middle_string = ''
current_count = 0
for key, value in data.items():
if current_count == len(data.items())-1:
middle_string += f" {key} = '{value}'"
else:
middle_string += f" {key} = '{value}', "
current_count += 1
return middle_string
return f"UPDATE {self.name} SET{manufactur_update_SQL_string(data)} WHERE id = {id}"
def convert_data(self, rows: list or tuple):
""" Takes rows returned by the tables SQL_select string and returns them as namedtuples.
Args:
rows (listortuple):
Returns:
(listortuple): returns a list of namedtuple.
"""
self.keys = list(self.columns.keys())
if isinstance(rows, list):
pointer_list =[]
for row in rows:
__conversion__ = []
for id, pointer in enumerate(row):
column_type = list(self.columns.values())[id-1]
if isinstance(pointer, str) and column_type != "json":
pointer = VALDATED_STRING(pointer, direction=1)
__conversion__.append(pointer)
__conversion__ = tuple(__conversion__)
pointer_list.append(__conversion__)
return [self.data_object(**{key: data[i+1] for i, key in enumerate(self.keys)}) for data in pointer_list]
if isinstance(rows, tuple):
__conversion__ = []
for id, pointer in enumerate(rows):
column_type = list(self.columns.values())[id-1]
if isinstance(pointer, str) and column_type != "json":
pointer = VALDATED_STRING(pointer, direction=1)
__conversion__.append(pointer)
__conversion__ = tuple(__conversion__)
return [self.data_object(**{key: __conversion__[i+1] for i, key in enumerate(self.keys)})][0]
return Table
def basic_query(query: str):
""" Used as single query functions, ex. creating tables, dropping tables, updating rows
Args:
query (str): SQL_execute string
"""
with database:
cursor = database.cursor()
cursor.execute(query)
def create_table(table: Table, drop=False):
if drop:
drop_table(table=table)
try:
with database:
cursor = database.cursor()
cursor.execute(table.create_table)
except sqlite3.OperationalError:
pass
def drop_table(table: Table):
try:
with database:
cursor = database.cursor()
cursor.execute(table.drop_table)
except sqlite3.OperationalError:
pass
def update_table_row_by_id(table: Table, row):
query = table.update_row_by_id(data=row[1], id=row[0])
with database:
cursor = database.cursor()
cursor.execute(query)
def insert_into_table(table, data):
""" Passing a query and its data as a namedtuple will insert the query into the database.
Args:
query (namedtuple): (query.query = SQL_execute string, query.data = tuple of column's data)
"""
query = table.insert_row(data)
assert query.query, query.data
with database:
cursor = database.cursor()
cursor.execute(query.query, query.data)
def fetchone_from_table(table: Table, filter: tuple([str, Any]) = None, convert_data=True) -> tuple:
if filter:
query = table.select_row(column=filter[0], match=filter[1])
else:
query = table.select_row()
with database:
cursor = database.cursor()
cursor.execute(query)
if not convert_data:
return cursor.fetchone()
return table.convert_data(cursor.fetchone())
def fetchall_from_table(table, filter: tuple([str, Any]) = None, convert_data=True) -> list:
""" Fetches all rows from the database using passed query
Args:
query (str): SQL_execute string
Returns:
list: list of rows as tuples
"""
if filter:
query = table.select_row(column=filter[0], match=filter[1])
else:
query = table.select_row()
with database:
cursor = database.cursor()
cursor.execute(query)
batch = cursor.fetchall()
if len(batch) == 1:
batch = batch[0]
if not convert_data:
return batch
return table.convert_data(batch)

209
export_process.py Normal file
View File

@ -0,0 +1,209 @@
"""
This file contains all Gui and logic directly impacting the exportation processing
"""
import datetime
import os
import json
import tempfile
import pathlib
from zipfile import ZIP_DEFLATED, ZipFile
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import QDialog, QListWidget, QListWidgetItem, QProgressBar, QLabel, QPushButton
from PyQt5.QtGui import QIcon
import icons, settings, easySQL, tables
global app_settings
app_settings = settings.app_settings
class ExportCollectionThread(QThread):
"""This thread logic takes a Penumbra Collection and compresses it into a .pcmp that can later be
imported from another client.
Args:
QThread (object): adjacent thread to main loop to allow for a progress indication for user QoL
"""
next_step_signal = pyqtSignal(int) # tells progress bar what step it is on
indicate_change = pyqtSignal(str) # used to change a label and return a string to said label
set_progress_bar_max = pyqtSignal(int) # dynamic way to calculate total progress steps
step_complete = pyqtSignal(int) # used to indicate in the progress dialog what steps are complete for QoL
export_complete = pyqtSignal(bool) # used to indicate the Ui that it needs to repopulate
def __init__(self, payload) -> None:
super().__init__()
self.payload = payload
def run(self):
"""
Main Thread logic that takes the provided collection and outputs a .pcmp file for use by the client.
"""
collection_json = self.payload['collection_json']
meta_json = self.payload['meta_data']
#determine what mods to copy and total files
total_files = self.calculate_total_files()
meta_json['totalfiles'] = total_files
#send signals for progress and next step
self.set_progress_bar_max.emit(total_files + 4)
self.next_step_signal.emit(1)
self.step_complete.emit(0)
#create timestamp and file path
now = datetime.datetime.now()
time_stamp = datetime.datetime.strftime(now, '%Y-%m-%d %H-%M')
name = collection_json['Name']
final_path = pathlib.Path(f"{self.payload['save_directory']}") / f"{time_stamp}-{name}.pcmp"
self.step_complete.emit(1)
with ZipFile(final_path, 'w', compression=ZIP_DEFLATED) as pcmp_zip_file:
self.next_step_signal.emit(2)
self.copy_manifest_collection_data(
pcmp_zip_file=pcmp_zip_file,
meta=meta_json,
collection_json=collection_json
)
self.step_complete.emit(2)
self.next_step_signal.emit(3)
self.copy_modifications_to_zip(pcmp_zip_file=pcmp_zip_file)
self.step_complete.emit(3)
self.step_complete.emit(4)
self.next_step_signal.emit(total_files + 4)
self.export_complete.emit(True)
def copy_modifications_to_zip(self, pcmp_zip_file: ZipFile):
""" Overarching compression of each modification into the pcmp zip
Args:
zip_ref (zipfile.ZipFile): PenumbraCollection Modification Pack for exportation and importation
mods_to_copy (list): A pre generated list of mod folders to compress into the .pcmp zip
"""
overfile_files = 0
for modification in self.payload['mods_to_copy']:
zip_modification_final_path = os.path.join('mods', modification[0])
local_modification_directory = pathlib.Path(modification[1])
for current_file, file_path in enumerate(local_modification_directory.rglob("*")):
pcmp_zip_file.write(file_path, arcname=os.path.join(zip_modification_final_path, file_path.relative_to(local_modification_directory)))
overfile_files += 1
self.indicate_change.emit(f"({current_file + 1}/{modification[2]}) Compressing {file_path} to zip...")
self.next_step_signal.emit(3 + overfile_files)
def calculate_total_files(self):
""" Calculates the total files to copy into the .pcmp file
Returns:
total_files (int):
"""
total_files = 0
for modification in self.payload['mods_to_copy']:
total_files += modification[2]
return total_files
def copy_manifest_collection_data(self, pcmp_zip_file: ZipFile, meta:dict, collection_json: dict):
""" Takes all the required files and data and writes them to the pcmp_zip_file
Args:
pcmp_zip_file (zipfile.ZipFile): PenumbraCollection Modification Pack for exportation and importation
meta (dict): data needed for importation
collection_json_path (str): Penumbra's generated json for the collection
"""
# TODO: figure out how to save a file directly without having to use a temporary directory for it
temporary_directory = tempfile.TemporaryDirectory()
with open(os.path.join(temporary_directory.name, 'meta.json'), 'w') as file:
json.dump(meta, file, indent=3)
with open(os.path.join(temporary_directory.name, f"{collection_json['Id']}.json"), 'w') as file:
json.dump(collection_json, file, indent=3)
pcmp_zip_file.write(os.path.join(temporary_directory.name, 'meta.json'), 'meta.json')
pcmp_zip_file.write(os.path.join(temporary_directory.name, f"{collection_json['Id']}.json"), f"{collection_json['Id']}.json")
temporary_directory.cleanup()
class ExportProgressDialog(QDialog):
"""PyQt5 dialog that displays a QoL checklist and progress bar for the end user during the exportation process.
Args:
QDialog (PyQt5.QtWidgets.QDialog):
"""
export_steps = [
"Organized environment for export",
"Created manifest, paths, collection information",
"Archive created and Collection and manifest copied",
"Modifications copied",
"Cleaning Up with Finishing Touches",
"Export Done"
]
def __init__(self, parent, payload):
super().__init__(parent)
self.payload = payload
self.resize(400, 360)
self.setWindowTitle('Exporting Collection...')
self.progress_bar = QProgressBar(self)
self.progress_bar.setMaximum(10)
self.QoL_step_label = QLabel(self)
self.QoL_step_label.setWordWrap(True)
self.QoL_step_list = QListWidget(self)
self.abort_button = QPushButton(self)
self.abort_button.setText("Abort")
self.QoL_step_label.setGeometry(30, 25, 340, 25)
self.progress_bar.setGeometry(30, 60, 340, 25)
self.QoL_step_list.setGeometry(30, 95, 340, 215)
self.abort_button.setGeometry(310, 325, 80, 25)
for each in self.export_steps:
item = QListWidgetItem()
item.setIcon(QIcon(icons.step_incomplete))
item.setText(each)
self.QoL_step_list.addItem(item)
self.show()
def start_exportation(self):
self.my_thread = ExportCollectionThread(payload=self.payload)
self.my_thread.next_step_signal.connect(self.onStepChange)
self.my_thread.indicate_change.connect(self.onLabelChange)
self.my_thread.set_progress_bar_max.connect(self.set_dynamic_maximum)
self.my_thread.step_complete.connect(self.stepComplete)
self.my_thread.start()
def onStepChange(self, value):
self.progress_bar.setValue(value)
def onLabelChange(self, text):
self.QoL_step_label.setText(text)
def set_dynamic_maximum(self, value):
self.progress_bar.setMaximum(value)
def abort_export(self):
# TODO: abort logic requires that the ZIP be deleted so there is no confusion on a half baked modpack
pass
def close(self) -> bool:
self.my_thread.exit()
return super().close()
def stepComplete(self, index):
self.QoL_step_list.item(index).setIcon(QIcon(icons.step_complete))
if index == 4:
self.QoL_step_list.item(4).setIcon(QIcon(icons.step_complete))
self.QoL_step_list.item(5).setIcon(QIcon(icons.step_complete))
self.abort_button.setText('Done')
self.abort_button.clicked.connect(self.close)
else:
self.QoL_step_list.item(index).setIcon(QIcon(icons.step_complete))

9
helper_functions.py Normal file
View File

@ -0,0 +1,9 @@
import json, pathlib
""" Here i will be putting any helper functions that may be used throughout
the application in order to not have to write the same code at the same time. """
def load_bom_json(file_path: pathlib.Path) -> json:
""" helper function to encode a json as utf-8 and then load it returning the dict. """
with file_path.open(encoding="utf-8-sig") as loaded_file:
return json.load(loaded_file)

37
icons.py Normal file
View File

@ -0,0 +1,37 @@
import sys, os
def resource_path(relative_path):
try:
base_path = sys._MEIPASS
except:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
icon_dir = resource_path("icons")
# <a href="https://www.flaticon.com/free-icons/filter" title="filter icons">Filter icons created by Freepik - Flaticon</a>
clear_filter = f"{icon_dir}\\filter.png"
#<a href="https://www.flaticon.com/free-icons/letter-a" title="letter a icons">Letter a icons created by Ivan Repin - Flaticon</a>
active_collection = f'{icon_dir}\\letter-a.png'
window_icon = f'{icon_dir}\\window_icon.png'
step_incomplete = f'{icon_dir}\\checkbox.png'
step_complete = f'{icon_dir}\\accept.png'
caution = f'{icon_dir}\\caution.png'
configure = f'{icon_dir}\\settings.png'
open_folder = f'{icon_dir}\\open-folder.png'
connected_true = f'{icon_dir}\\checked.png'
connected_false = f'{icon_dir}\\cancel.png'
refresh = f'{icon_dir}\\refresh.png'
import_icon = f'{icon_dir}\\import.png'
export_icon = f'{icon_dir}\\export.png'
upload_icon = f'{icon_dir}\\upload.png'
database_good = f'{icon_dir}\\database_good.png'
database_bad = f'{icon_dir}\\database_bad.png'
profile = f'{icon_dir}\\login.png'
help_icon = f'{icon_dir}\\help.png'
exit_icon = f'{icon_dir}\\exit.png'

BIN
icons/16x16/export20.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 401 B

BIN
icons/16x16/import.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 325 B

BIN
icons/16x16/import20.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 420 B

BIN
icons/accept.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

BIN
icons/cancel.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

BIN
icons/caution.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
icons/checkbox.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

BIN
icons/checked.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

BIN
icons/database_bad.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

BIN
icons/database_good.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

BIN
icons/exit.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

BIN
icons/export.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

BIN
icons/filter.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

BIN
icons/help.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

BIN
icons/import.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.0 KiB

BIN
icons/letter-a.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

BIN
icons/login.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

BIN
icons/open-folder.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
icons/refresh.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

BIN
icons/settings.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

BIN
icons/upload.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

BIN
icons/window_icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

205
import_process.py Normal file
View File

@ -0,0 +1,205 @@
from PyQt5.QtWidgets import (QDialog, QProgressBar, QLabel, QListWidget, QPushButton, QListWidgetItem)
from PyQt5.QtGui import QIcon
import json, zipfile, pathlib, shutil, os
from PyQt5.QtCore import QThread, pyqtSignal
import icons, settings, helper_functions
global app_settings
app_settings = settings.app_settings
class ImportCollectionThread(QThread):
"""This Thread logic takes a Penumbra Collection ModPack (.pcmp) and decompresses it into the users Modification
and collection folders.
Args:
QThread (object): adjacent thread to main loop to allow for a progress indication for user QoL
"""
next_step_signal = pyqtSignal(int) # tells progress bar what step it is on
indicate_change = pyqtSignal(str) # used to change a label and return a string to said label
set_progress_bar_max = pyqtSignal(int) # dynamic way to calculate total progress steps
step_complete = pyqtSignal(int) # used to indicate in the progress dialog what steps are complete for QoL
def __init__(self, payload):
"""
Args:
path_to_pcmp (str):
user_collection_directory (str):
user_modification_directory (str):
"""
super().__init__()
self.payload = payload
def run(self):
self.collection_uuid = self.payload['meta_data']['collection_uuid']
self.collection_name = self.payload['meta_data']['collection_name']
#save collection
collections_folder = pathlib.Path(app_settings.get_setting('collections_folder'))
collection_path = collections_folder / f"{self.collection_uuid}.json"
# need to modify collection json for unique tag...
collection_settings = self.payload['collection_json']['Settings']
new_settings = {}
for key, value in collection_settings.items():
new_settings[f"({self.collection_name}) {key}"] = value
self.payload['collection_json']['Settings'] = new_settings
with collection_path.open('w+') as file:
try:
json.dump(self.payload['collection_json'], file, indent=2)
except TypeError as e:
print(e)
sort_order_json = pathlib.Path(app_settings.get_setting('penumbra_path'), f"Penumbra/sort_order.json")
print(sort_order_json)
sort_order = helper_functions.load_bom_json(sort_order_json)
sort_order_data = sort_order['Data']
self.step_complete.emit(0) # copy over collection data signal
self.set_progress_bar_max.emit(self.payload['meta_data']['totalfiles'])
# TODO: create a database to save what collections are imported.
with zipfile.ZipFile(self.payload['path_to_zip']) as pcmp_file:
self.step_complete.emit(1) # open zip folder and set up environment signal
# copy mods into mod folder
name_list = pcmp_file.namelist()
overcount_files = 0
for modification in self.payload['mods_to_copy']:
sort_order_data[f"({self.collection_name}) {modification}"] = f"{self.collection_name}/({self.collection_name}) {modification}"
#makes all mod folders unique by default TODO: replace mod_folder_test with user folder
# Determine modifactions target directory and make sure it does not exists
modification_folder = pathlib.Path(app_settings.get_setting("modification_folder"), f"({self.collection_name}) {modification}")
if not os.path.exists(modification_folder):
os.mkdir(modification_folder)
# Create a filtered lists of all the found paths in the archives mods folder
filtered_paths_to_copy = [path for path in name_list if f"mods/{modification}" in path]
file_paths = [] # stores file paths and name for indexing and saving an index in the mods folder for future use.
# Start the counter and copying each filtered file in the archive over to the modification target
for current_count, path in enumerate(filtered_paths_to_copy):
target_path = modification_folder / path.replace(f"mods/{modification}/", "")
file_paths.append(tuple([path, str(target_path)]))
member_info = pcmp_file.getinfo(path)
# if the path is a inner directory then make it else, copy the file over into the target_path
if member_info.is_dir():
try:
os.makedirs(target_path)
except FileExistsError as e:
#print(e)
pass
elif "meta.json" in path:
data = json.load(pcmp_file.open(path))
data['Name'] = f"({self.collection_name}) {data['Name']}"
with target_path.open("w+") as _file:
json.dump(data, _file, indent=2)
else:
source = pcmp_file.open(path) # path into the archive
target = open(target_path, 'wb+') # target path on the local machine
with source, target_path:
shutil.copyfileobj(source, target)
target.close()
overcount_files += 1
self.indicate_change.emit(f"({current_count}/{len(filtered_paths_to_copy)}) decompressing {path} to {modification_folder}...")
self.next_step_signal.emit(overcount_files)
# Save a files index into the modifications folder
files_index_json = modification_folder / "pcmp_index.json"
with files_index_json.open('w+') as index_file:
json.dump(file_paths, index_file, indent=2)
sort_order['Data'] = sort_order_data
with sort_order_json.open('w+') as _file:
json.dump(sort_order, _file, indent=2)
self.step_complete.emit(2) # Modifications copied signal
self.step_complete.emit(3) # clean up and closing files signal
self.step_complete.emit(4) # import done
class ImportProgressDialog(QDialog):
steps = [
"Opened Archive",
"Copying Collection files",
"Modifications Copied",
"Cleaning Up with Finishing Touches",
"Import Done"
]
def __init__(self, parent, payload):
super().__init__(parent)
self.payload = payload
self.resize(400, 360)
self.setWindowTitle('Importing Collection...')
self.progress_bar = QProgressBar(self)
self.progress_bar.setMaximum(10)
self.QoL_step_label = QLabel(self)
self.QoL_step_label.setWordWrap(True)
self.QoL_step_list = QListWidget(self)
self.abort_button = QPushButton(self)
self.abort_button.setText("Abort")
self.QoL_step_label.setGeometry(30, 25, 340, 25)
self.progress_bar.setGeometry(30, 60, 340, 25)
self.QoL_step_list.setGeometry(30, 95, 340, 215)
self.abort_button.setGeometry(310, 325, 80, 25)
for each in self.steps:
item = QListWidgetItem()
item.setIcon(QIcon(icons.step_incomplete))
item.setText(each)
self.QoL_step_list.addItem(item)
self.show()
def import_collection(self):
""" main function that leads into the import thread.
Args:
path_to_pcmp (str): pcmp file to import
"""
# TODO: I want to be able to manipulate this data
self.my_thread = ImportCollectionThread(payload=self.payload)
self.my_thread.next_step_signal.connect(self.do_step_change)
self.my_thread.indicate_change.connect(self.change_label)
self.my_thread.set_progress_bar_max.connect(self.set_Progress_Bar_max)
self.my_thread.step_complete.connect(self.step_complete)
self.my_thread.start()
def do_step_change(self, progress_value: int):
self.progress_bar.setValue(progress_value)
def change_label(self, text: str):
self.QoL_step_label.setText(text)
def set_Progress_Bar_max(self, progress_max: int):
self.progress_bar.setMaximum(progress_max)
def abort_import(self):
pass
def done_button(self):
self.my_thread.exit()
return super().close()
def step_complete(self, index):
if index == 3:
self.QoL_step_list.item(3).setIcon(QIcon(icons.step_complete))
self.QoL_step_list.item(4).setIcon(QIcon(icons.step_complete))
self.abort_button.setText('Done')
self.abort_button.clicked.connect(self.done_button)
else:
self.QoL_step_list.item(index).setIcon(QIcon(icons.step_complete))

180
main.py Normal file
View File

@ -0,0 +1,180 @@
# external modules
import sys, json, pathlib, os, datetime, requests, tempfile, zipfile, subprocess
from PyQt5.QtWidgets import QApplication, QMessageBox
from PyQt5 import QtCore
from cloud_backend import UpdateOptInDialog, ping
# internal modules
from MainGui import MainGui
import settings, helper_functions, easySQL, tables
def penumbra_path_validation(parent):
""" This function is a preload warning if penumbra is not installed in the correct path which is
usually an indication it isnt installed at all."""
if not os.path.exists(settings.app_settings.get_setting('penumbra_path')):
warning_dialog = QMessageBox(parent)
warning_dialog.setWindowTitle('Incorrect Penumbra Installation...')
warning_dialog.setText(f"""It appears that Penumbra is not installed on your system at: \n{settings.app_settings.get_setting('penumbra_path')}\nThat means this program is useless to you at the moment! Please install Penumbra and follow its instructions before using this application!""")
warning_dialog.setIcon(QMessageBox.Warning)
warning_dialog.setStandardButtons(QMessageBox.Close)
button = warning_dialog.exec()
if button == QMessageBox.Close:
sys.exit()
def build_and_update_collections_database():
""" updates collections installed in the collections folder into the database """
collections_folder = pathlib.Path(settings.app_settings.get_setting('penumbra_path')) / "Penumbra" / "collections"
settings.app_settings.set_setting(key="collections_folder", value=str(collections_folder.absolute()))
active_collections_json = pathlib.Path(settings.app_settings.get_setting('penumbra_path')) / "Penumbra" / "active_collections.json"
settings.app_settings.set_setting(key="active_collections", value=str(active_collections_json.absolute()))
active_collections = helper_functions.load_bom_json(active_collections_json)
individuals = active_collections.pop("Individuals")
active_collections.pop("Version")
if os.path.isdir(collections_folder):
for id, collection_json in enumerate(collections_folder.glob("*")):
file_extension = os.path.splitext(collection_json)[1]
if file_extension == ".json":
data = helper_functions.load_bom_json(collection_json)
character_links = {}
for key, collection in active_collections.items():
if collection == data['Name']:
character_links[key] = True
else:
character_links[key] = False
collection_indvi = []
for individual in individuals:
if individual['Collection'] == data['Name']:
collection_indvi.append(individual)
character_links['Individuals'] = collection_indvi
easySQL.insert_into_table(tables.collections,
(id,
data['Version'],
data['Id'],
data['Name'],
json.dumps(data['Settings']),
json.dumps(data['Inheritance']),
json.dumps(character_links)
))
v = datetime.datetime.now().strftime("v%y.%m.%d")
easySQL.insert_into_table(tables.metadata, (
id,
data['Name'],
"Dummy Comment",
str(v)
))
def build_and_update_modifications_database():
"""get the mod directory and updates into the modification database"""
modification_folder = pathlib.Path(helper_functions.load_bom_json(pathlib.Path(settings.app_settings.get_setting('penumbra_path')) / "Penumbra.json")["ModDirectory"])
settings.app_settings.set_setting(key="modification_folder", value=str(modification_folder.absolute()))
if os.path.isdir(modification_folder):
for id, modification in enumerate(modification_folder.glob("*")):
try:
meta_path = pathlib.Path(modification / "meta.json")
if os.path.isdir(modification) and os.path.exists(meta_path):
length = 0
for file_path in modification.rglob("*"):
length += 1
meta = helper_functions.load_bom_json(modification / "meta.json")
easySQL.insert_into_table(tables.modifications, (
id,
meta['FileVersion'],
meta['Name'],
meta['Author'],
meta['Description'],
meta['Version'],
meta['Website'],
json.dumps(meta['ModTags']),
str(modification.absolute()),
length
))
elif pathlib.Path(modification).suffix == ".pmp":
print(modification)
except:
print(modification)
if hasattr(QtCore.Qt, 'AA_EnableHighDpiScaling'):
QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling, False)
if hasattr(QtCore.Qt, 'AA_UseHighDpiPixmaps'):
QApplication.setAttribute(QtCore.Qt.AA_UseHighDpiPixmaps, False)
def main():
app = QApplication(sys.argv)
settings.app_settings.load_settings()
settings.app_settings.external_path = pathlib.Path.cwd()
main_gui = MainGui()
if settings.app_settings.first_load:
dialog = UpdateOptInDialog(main_gui, "https://ffxiv.treehousefullofstars.com/update_app/latest")
result = dialog.exec()
if result == 1:
settings.app_settings.set_setting("update_opt_in", dialog.get_opt_in_state())
settings.app_settings.set_setting("update_pipeline", dialog.get_pipeline())
settings.app_settings.save_settings()
else:
return False
if settings.app_settings.get_setting("update_opt_in"):
pipeline = settings.app_settings.get_setting("update_pipeline")
app_version = settings.app_settings.get_setting("app_version")
url_break = pipeline.split("/")
url = url_break[0] + "//" + url_break[2] + "/check_server"
result = ping(url)
if result:
update_info = requests.get(str(pipeline + "/" + app_version)).json()['data']
print(update_info['update_ready'])
if update_info['update_ready']:
msg_box_name = QMessageBox()
msg_box_name.setIcon(QMessageBox.Question)
msg_box_name.setWindowTitle("Update Available...")
msg_box_name.setText(f"It appears there is an update to the app available.\n\nYour current version is: {app_version}\nThe new version is: {update_info['latest_version']}\n\nPlease update your app, if you decline to then your ability to connect to servers will be disabled!")
msg_box_name.setStandardButtons(QMessageBox.Ok | QMessageBox.No)
retval = msg_box_name.exec_()
if retval == QMessageBox.No:
settings.app_settings.lock_client = True
if retval == QMessageBox.Ok:
return False
# penumbra validation to ensure the plugin is even installed and set up!
penumbra_path_validation(parent=main_gui)
# build database
build_and_update_collections_database()
build_and_update_modifications_database()
main_gui.__post__init__()
main_gui.show()
settings.app_settings.save_settings()
avatars = pathlib.Path(settings.app_settings.external_path.absolute()) / "avatars"
avatars.mkdir(parents=True, exist_ok=True)
sys.exit(app.exec_())
if __name__ == '__main__':
print(sys.argv)
try:
main()
sys.exit()
except Exception as e:
with open('log.txt', "w+") as file:
file.write(str(e))

44
main.spec Normal file
View File

@ -0,0 +1,44 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['main.py'],
pathex=[],
binaries=[],
datas=[('./icons', './icons')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=["database.sqlite", "settings.json"],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='Collection Sharing App',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
coll = COLLECT(
exe,
a.binaries,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='Collection Sharing App V1.0',
)

45
main_old.spec Normal file
View File

@ -0,0 +1,45 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['main.py'],
pathex=[],
binaries=[],
datas=[('./icons', './icons')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=['settings.json', 'database,sqlite'],
noarchive=False,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='Collection Sharing App',
debug=True,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
contents_directory='.',
)
coll = COLLECT(
exe,
a.binaries,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='Collection Sharing App v1.0',
)

71
settings.py Normal file
View File

@ -0,0 +1,71 @@
import json, os, pathlib
class AppSettings(object):
def __init__(self, path) -> None:
self.path = pathlib.Path(path)
self.__get__ = {}
self.username = ""
self.password = ""
self.connected_server = ""
self.port = ""
self.user_data = {}
self.connected = False
self.external_path = ""
self.update_opt_in = False
self.update_pipeline = ""
self.first_load = False
self.app_version = ""
self.lock_client = False
def get_setting(self, key=None):
""" getter for any key in the settings dictionary! call it without a provided key to get the whole dictionary or pass a
key value and it returns that settings.
"""
if key:
return self.__get__[key]
return self.__get__
def set_setting(self, key, value):
""" setter for any value in the settings """
self.__get__[key] = value
def load_settings(self):
""" This loads pre-existing settings or the default if none exists! """
# check if settings exists if does then load that into variable
if os.path.exists(self.path.absolute()):
with self.path.open('r+') as file:
file_contents = json.load(file)
self.__get__ = file_contents
self.first_load = False
self.lock_client = False
# if not then create it and load in defaults
else:
global default_settings
with self.path.open('w+') as file:
json.dump(default_settings, file)
self.__get__ = default_settings
self.first_load = True
def save_settings(self):
""" Saves the current settings values to self.path! """
# check to see if the path exists for failsafe and then write to json
if os.path.exists(self.path.absolute()):
with self.path.open('w+') as file:
json.dump(self.__get__, file, indent=2)
global app_settings
app_settings = AppSettings('settings.json')
global default_settings
default_settings = {
"window_title": "Collection Sharing App",
"window_height": 400,
"window_width": 640,
"penumbra_path": str(pathlib.Path(pathlib.Path.home() / "AppData/Roaming" / "XIVLauncher" / "pluginConfigs").absolute()),
"advanced_mode": False,
"update_opt_in": False,
"update_pipeline": "",
"app_version": "alpha_v1.0"
}

54
tables.py Normal file
View File

@ -0,0 +1,54 @@
import easySQL
import pathlib
@easySQL.Table
class Collections:
def __init__(self) -> None:
self.name = "Collections"
self.columns = {
'collection_id': easySQL.INTEGER,
'version': easySQL.INTEGER,
'uuid': easySQL.STRING,
'name': easySQL.STRING,
'settings': easySQL.JSON,
'inheritance': easySQL.JSON,
'character_links': easySQL.JSON
}
@easySQL.Table
class CollectionMeta:
def __init__(self) -> None:
self.name = "MetaTable"
self.columns = {
'meta_id': easySQL.INTEGER,
'collection_name': easySQL.STRING,
'comments': easySQL.STRING,
'version': easySQL.STRING
}
@easySQL.Table
class Modifications:
def __init__(self) -> None:
self.name = "Modifications"
self.columns = {
'mod_id': easySQL.INTEGER,
'fileversion': easySQL.INTEGER,
'name': easySQL.STRING,
'author': easySQL.STRING,
'description': easySQL.STRING,
'version': easySQL.INTEGER,
'website': easySQL.STRING,
'modtags': easySQL.JSON,
'mod_path': easySQL.STRING,
'total_files': easySQL.INTEGER
}
collections = Collections()
modifications = Modifications()
metadata = CollectionMeta()
easySQL.intergrate(database_path=pathlib.Path('database.sqlite'))
easySQL.create_table(collections, drop=True)
easySQL.create_table(modifications, drop=True)
easySQL.create_table(metadata, drop=True)

34
update_app.py Normal file
View File

@ -0,0 +1,34 @@
import sys, json, psutil, requests, tempfile, os, zipfile, signal
def update_app(url, external_path):
print(url)
result = requests.get(url)
print(external_path)
with tempfile.TemporaryDirectory() as temp_dir:
print(temp_dir)
with open(f"{temp_dir}/temp.zip", "wb") as file:
file.write(result.content)
with zipfile.ZipFile(f"{temp_dir}/temp.zip", 'r') as zip_ref:
for member in zip_ref.namelist():
print(member)
filepath = os.path.join(external_path, member)
os.remove(filepath) if os.path.exists(filepath) else None # Remove if exists
zip_ref.extractall(external_path)
with open('log.txt', "w+") as file:
file.write(str("updated"))
try:
test =psutil.Process(int(sys.argv[1]))
os.kill(int(sys.argv[1]), signal.SIGTERM)
except psutil.NoSuchProcess as e:
try:
update_app(url=sys.argv[2], external_path=sys.argv[3])
except Exception as e:
test = str(e)
with open("log.txt", "w+") as file:
file.write(json.dumps(test))
sys.exit()

257
upload_process.py Normal file
View File

@ -0,0 +1,257 @@
"""
This file contains all Gui and logic directly impacting the exportation processing
"""
import httpx
import datetime
import os
import json
import tempfile
import pathlib
from zipfile import ZIP_DEFLATED, ZipFile
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import QDialog, QListWidget, QListWidgetItem, QProgressBar, QLabel, QPushButton
from PyQt5.QtGui import QIcon
import icons, settings
from cloud_backend import upload_file
global app_settings
app_settings = settings.app_settings
class UploadCollectionThread(QThread):
"""This thread logic takes a Penumbra Collection and compresses it into a .pcmp that can later be
imported from another client.
Args:
QThread (object): adjacent thread to main loop to allow for a progress indication for user QoL
"""
next_step_signal = pyqtSignal(int) # tells progress bar what step it is on
indicate_change = pyqtSignal(str) # used to change a label and return a string to said label
set_progress_bar_max = pyqtSignal(int) # dynamic way to calculate total progress steps
step_complete = pyqtSignal(int) # used to indicate in the progress dialog what steps are complete for QoL
export_complete = pyqtSignal(bool) # used to indicate the Ui that it needs to repopulate
def __init__(self, payload) -> None:
super().__init__()
self.payload = payload
self.final_path = ""
def run(self):
"""
Main Thread logic that takes the provided collection and outputs a .pcmp file for use by the client.
"""
collection_json = self.payload['collection_json']
meta_json = self.payload['meta_data']
#determine what mods to copy and total files
total_files = self.calculate_total_files()
meta_json['totalfiles'] = total_files
#send signals for progress and next step
self.set_progress_bar_max.emit(total_files + 4)
self.next_step_signal.emit(1)
self.step_complete.emit(0)
#create timestamp and temp folder for file path
temporary_directory = tempfile.TemporaryDirectory()
now = datetime.datetime.now()
time_stamp = datetime.datetime.strftime(now, '%Y-%m-%d %H-%M')
name = collection_json['Name']
filename = f"{time_stamp}-{name}.pcmp"
self.final_path = pathlib.Path(f"{temporary_directory.name}") / filename
self.step_complete.emit(1)
with ZipFile(self.final_path, 'w', compression=ZIP_DEFLATED) as pcmp_zip_file:
self.next_step_signal.emit(2)
self.copy_manifest_collection_data(
pcmp_zip_file=pcmp_zip_file,
meta=meta_json,
collection_json=collection_json,
temporary_directory=temporary_directory.name
)
self.step_complete.emit(2)
self.next_step_signal.emit(3)
self.copy_modifications_to_zip(pcmp_zip_file=pcmp_zip_file)
self.step_complete.emit(3)
self.upload_file(filename, self.final_path)
self.step_complete.emit(4)
self.next_step_signal.emit(total_files + 4)
temporary_directory.cleanup()
self.export_complete.emit(True)
def copy_modifications_to_zip(self, pcmp_zip_file: ZipFile):
""" Overarching compression of each modification into the pcmp zip
Args:
zip_ref (zipfile.ZipFile): PenumbraCollection Modification Pack for exportation and importation
mods_to_copy (list): A pre generated list of mod folders to compress into the .pcmp zip
"""
overfile_files = 0
for modification in self.payload['mods_to_copy']:
zip_modification_final_path = os.path.join('mods', modification[0])
local_modification_directory = pathlib.Path(modification[1])
for current_file, file_path in enumerate(local_modification_directory.rglob("*")):
pcmp_zip_file.write(file_path, arcname=os.path.join(zip_modification_final_path, file_path.relative_to(local_modification_directory)))
overfile_files += 1
self.indicate_change.emit(f"({current_file + 1}/{modification[2]}) Compressing {file_path} to zip...")
self.next_step_signal.emit(3 + overfile_files)
def calculate_total_files(self):
""" Calculates the total files to copy into the .pcmp file
Returns:
total_files (int):
"""
total_files = 0
for modification in self.payload['mods_to_copy']:
total_files += modification[2]
return total_files
def copy_manifest_collection_data(self, pcmp_zip_file: ZipFile, meta:dict, collection_json: dict, temporary_directory: str):
""" Takes all the required files and data and writes them to the pcmp_zip_file
Args:
pcmp_zip_file (zipfile.ZipFile): PenumbraCollection Modification Pack for exportation and importation
meta (dict): data needed for importation
collection_json_path (str): Penumbra's generated json for the collection
"""
# TODO: figure out how to save a file directly without having to use a temporary directory for it
with open(os.path.join(temporary_directory, 'meta.json'), 'w') as file:
json.dump(meta, file, indent=3)
with open(os.path.join(temporary_directory, f"{collection_json['Id']}.json"), 'w') as file:
json.dump(collection_json, file, indent=3)
pcmp_zip_file.write(os.path.join(temporary_directory, 'meta.json'), 'meta.json')
pcmp_zip_file.write(os.path.join(temporary_directory, f"{collection_json['Id']}.json"), f"{collection_json['Id']}.json")
def upload_file(self, filename, path):
chunk_size = 10485760
url = f"{settings.app_settings.connected_server}/upload_file"
total = os.path.getsize(path)
self.next_step_signal.emit(0)
self.set_progress_bar_max.emit(total)
self.indicate_change.emit(f"Uploading to {settings.app_settings.connected_server}...")
data = {'filename': filename}
total_chunks_transferred = 0
with open(path, 'rb') as f:
while (chunk := f.read(chunk_size)):
files = {'file': (filename, chunk)}
response = httpx.post(url, files=files, timeout=None)
total_chunks_transferred += len(chunk)
self.next_step_signal.emit(total_chunks_transferred)
if response.status_code != 200:
break
headers = {'Content-Type': 'application/json'}
meta_data = self.payload['meta_data']
collection_json = self.payload['collection_json']
character_links = self.payload['character_links']
mod_list = self.payload['mods_to_copy']
data = {
"username": settings.app_settings.username,
"password": settings.app_settings.password,
"filename": filename,
"meta_data": meta_data,
"collection_json": collection_json,
"character_links": character_links,
"mod_list": mod_list
}
response = httpx.post(f"{settings.app_settings.connected_server}/upload_info", headers=headers, json=data)
print(response)
class UploadProgressDialog(QDialog):
"""PyQt5 dialog that displays a QoL checklist and progress bar for the end user during the exportation process.
Args:
QDialog (PyQt5.QtWidgets.QDialog):
"""
export_steps = [
"Organized environment for upload",
"Created manifest and collection information",
"Archive created and Collection and manifest copied",
"Modifications copied",
"Cleaning Up with Finishing Touches, Uploading",
"Upload Done"
]
def __init__(self, parent, payload):
super().__init__(parent)
self.payload = payload
self.resize(400, 360)
self.setWindowTitle('Uploading Collection...')
self.progress_bar = QProgressBar(self)
self.progress_bar.setMaximum(10)
self.QoL_step_label = QLabel(self)
self.QoL_step_label.setWordWrap(True)
self.QoL_step_list = QListWidget(self)
self.abort_button = QPushButton(self)
self.abort_button.setText("Abort")
self.QoL_step_label.setGeometry(30, 25, 340, 25)
self.progress_bar.setGeometry(30, 60, 340, 25)
self.QoL_step_list.setGeometry(30, 95, 340, 215)
self.abort_button.setGeometry(310, 325, 80, 25)
for each in self.export_steps:
item = QListWidgetItem()
item.setIcon(QIcon(icons.step_incomplete))
item.setText(each)
self.QoL_step_list.addItem(item)
self.final_path = ""
self.show()
def start_upload(self):
self.my_thread = UploadCollectionThread(payload=self.payload)
self.my_thread.next_step_signal.connect(self.onStepChange)
self.my_thread.indicate_change.connect(self.onLabelChange)
self.my_thread.set_progress_bar_max.connect(self.set_dynamic_maximum)
self.my_thread.step_complete.connect(self.stepComplete)
self.my_thread.start()
def onStepChange(self, value):
self.progress_bar.setValue(value)
def onLabelChange(self, text):
self.QoL_step_label.setText(text)
def set_dynamic_maximum(self, value):
self.progress_bar.setMaximum(value)
def abort_upload(self):
# TODO: abort logic requires that the ZIP be deleted so there is no confusion on a half baked modpack
pass
def close(self) -> bool:
self.final_path = self.my_thread.final_path
self.my_thread.exit()
return super().close()
def stepComplete(self, index):
self.QoL_step_list.item(index).setIcon(QIcon(icons.step_complete))
if index == 4:
self.QoL_step_list.item(4).setIcon(QIcon(icons.step_complete))
self.QoL_step_list.item(5).setIcon(QIcon(icons.step_complete))
self.abort_button.setText('Done')
self.abort_button.clicked.connect(self.close)
else:
self.QoL_step_list.item(index).setIcon(QIcon(icons.step_complete))