483 lines
16 KiB
Python
483 lines
16 KiB
Python
from flask import Flask, Request, render_template, request, redirect, url_for, flash, send_file, send_from_directory, Response, session, jsonify, current_app
|
|
from flask_avatars import Avatars
|
|
import os, pathlib, io
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy.ext.mutable import MutableList, MutableDict
|
|
from flask_login import LoginManager, current_user, login_user, logout_user, UserMixin, login_required
|
|
from werkzeug.utils import secure_filename
|
|
import random, string, bcrypt, datetime
|
|
from copy import deepcopy
|
|
from dataclasses import dataclass
|
|
from PIL import Image
|
|
import base64, json, pathlib
|
|
|
|
app = Flask(__name__)
|
|
avatars = Avatars(app)
|
|
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///db.sqlite"
|
|
app.config["SECRET_KEY"] = "MYSUPERCOOLAPPKEY%#!%#!GJDAJVNEIALDOFJGNA"
|
|
app.config["UPLOAD_FOLDER"] = "media"
|
|
app.config['SECURITY_PASSWORD_HASH'] = 'bcrypt'
|
|
app.config['SECURITY_PASSWORD_SALT'] = b'$2b$12$wqKlYjmOfXPghx3FuC3Pu'
|
|
app.config['AVATARS_SAVE_PATH'] = 'static/avatars/'
|
|
|
|
db = SQLAlchemy()
|
|
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
@dataclass
|
|
class Users(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(250), unique=True, nullable=False)
|
|
email = db.Column(db.String(250), unique=True, nullable=False)
|
|
password = db.Column(db.String(250), nullable=False)
|
|
firstname = db.Column(db.String(250), nullable=True)
|
|
lastname = db.Column(db.String(250), nullable=True)
|
|
friendcode = db.Column(db.String(250), nullable=False)
|
|
subs = db.Column(MutableList.as_mutable(db.JSON), nullable=False)
|
|
latest = db.Column(db.Integer, nullable=True)
|
|
raw_avatar = db.Column(db.String(250), nullable=False)
|
|
s_avatar = db.Column(db.String(250), nullable=False)
|
|
m_avatar = db.Column(db.String(250), nullable=False)
|
|
l_avatar = db.Column(db.String(250), nullable=False)
|
|
|
|
|
|
class Uploads(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
userid = db.Column(db.Integer, nullable=False)
|
|
upload_date = db.Column(db.String(250), nullable=False)
|
|
filename = db.Column(db.String(250), nullable=False)
|
|
author = db.Column(db.String(250), nullable=False)
|
|
version = db.Column(db.String(250), nullable=False)
|
|
description = db.Column(db.String(250), nullable=True)
|
|
meta_data = db.Column(MutableDict.as_mutable(db.JSON), nullable=True)
|
|
collection_json = db.Column(MutableDict.as_mutable(db.JSON), nullable=True)
|
|
character_links = db.Column(MutableDict.as_mutable(db.JSON), nullable=True)
|
|
mod_list = db.Column(MutableList.as_mutable(db.JSON), nullable=True)
|
|
path = db.Column(db.String(250), nullable=False)
|
|
|
|
db.init_app(app)
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
def friend_code(length):
|
|
letters = string.ascii_letters
|
|
return ''.join(random.choice(letters) for i in range(length))
|
|
|
|
@login_manager.user_loader
|
|
def loader_user(user_id):
|
|
return Users.query.get(user_id)
|
|
|
|
def password_hash(input_password):
|
|
bytes = input_password.encode('utf-8')
|
|
salt = bcrypt.gensalt()
|
|
|
|
return bcrypt.hashpw(bytes, salt)
|
|
|
|
def check_password(input_password, hash):
|
|
bytes = input_password.encode('utf-8')
|
|
result = bcrypt.checkpw(bytes, hash)
|
|
return result
|
|
|
|
|
|
@app.route("/login_app/avatar", methods=["POST"])
|
|
def login_request_avatar():
|
|
if request.method == "POST":
|
|
username = request.json['username']
|
|
user = Users.query.filter_by(username=username).first()
|
|
if user.s_avatar != "default":
|
|
path = f"static/{user.s_avatar}"
|
|
filename = f"{user.username}.png"
|
|
return send_file(open(path, "rb"), download_name=filename)
|
|
return Response(status=201)
|
|
|
|
@app.route('/check_server')
|
|
def ping():
|
|
return Response(status=200)
|
|
|
|
@app.route('/update_app/<pipe>/<version>')
|
|
def update_app(pipe, version):
|
|
current_latest = "alpha_v1.0"
|
|
if pipe == "latest":
|
|
if not current_latest == version:
|
|
update_ready = True
|
|
else:
|
|
update_ready = False
|
|
|
|
data = {
|
|
'latest_version': current_latest,
|
|
'update_ready': update_ready,
|
|
'download_url': f"/download_app/{pipe}"
|
|
}
|
|
|
|
|
|
return jsonify(data=data)
|
|
|
|
return Response(status=200)
|
|
|
|
@app.route("/upload_file", methods=["POST"])
|
|
def save_chunks():
|
|
if 'file' in request.files:
|
|
uploaded_file = request.files['file']
|
|
filename = uploaded_file.filename
|
|
with open(f'{app.config['UPLOAD_FOLDER']}/{filename}', 'ab') as f:
|
|
f.write(uploaded_file.read())
|
|
return '', 200
|
|
return 404
|
|
|
|
@app.route("/upload_info", methods=['POST'])
|
|
async def upload_info():
|
|
if not request.method == 'POST':
|
|
return Response(status=415)
|
|
|
|
data = request.json
|
|
|
|
user = Users.query.filter_by(username=data['username']).first()
|
|
if not check_password(data['password'], user.password):
|
|
return Response(status=415)
|
|
filename = data['filename']
|
|
|
|
# database upload
|
|
date = datetime.date.today()
|
|
save_path = f"{app.config['UPLOAD_FOLDER']}/{filename}"
|
|
meta_data = data['meta_data']
|
|
collection_json = data['collection_json']
|
|
character_links = data['character_links']
|
|
mod_list = data['mod_list']
|
|
print(meta_data)
|
|
upload = Uploads(
|
|
userid=user.id,
|
|
upload_date = date,
|
|
filename=filename,
|
|
author=user.username,
|
|
version="test",
|
|
description="",
|
|
meta_data = meta_data,
|
|
collection_json = collection_json,
|
|
character_links = character_links,
|
|
mod_list = mod_list,
|
|
path=save_path
|
|
)
|
|
db.session.add(upload)
|
|
db.session.commit()
|
|
|
|
return Response(status=200)
|
|
|
|
@app.route("/upload_app", methods=['POST'])
|
|
async def upload_app():
|
|
data = request.files['datas'].read()
|
|
data = json.loads(data)
|
|
if not request.method == 'POST':
|
|
return Response(status=415)
|
|
|
|
user = Users.query.filter_by(username=data['username']).first()
|
|
if not check_password(data['password'], user.password):
|
|
return Response(status=415)
|
|
|
|
if 'file' not in request.files:
|
|
print(request.files)
|
|
return Response(status=415)
|
|
|
|
file = request.files['file']
|
|
|
|
if file.filename == '':
|
|
return Response(status=415)
|
|
filename = data['filename']
|
|
filename = secure_filename(filename)
|
|
|
|
# database upload
|
|
date = datetime.date.today()
|
|
save_path = f"{app.config['UPLOAD_FOLDER']}/{filename}"
|
|
meta_data = data['meta_data']
|
|
collection_json = data['collection_json']
|
|
character_links = data['character_links']
|
|
mod_list = data['mods_to_copy']
|
|
print(meta_data)
|
|
upload = Uploads(
|
|
userid=user.id,
|
|
upload_date = date,
|
|
filename=filename,
|
|
author=user.firstname,
|
|
version="test",
|
|
description="",
|
|
meta_data = meta_data,
|
|
collection_json = collection_json,
|
|
character_links = character_links,
|
|
mod_list = mod_list,
|
|
path=save_path
|
|
)
|
|
db.session.add(upload)
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
|
# saves file
|
|
print(save_path)
|
|
file.save(save_path)
|
|
return Response(status=200)
|
|
|
|
|
|
@app.route("/login_app", methods=["POST"])
|
|
def login_app():
|
|
if request.method == "POST":
|
|
username = request.json['username']
|
|
user = Users.query.filter_by(username=username).first()
|
|
data= {
|
|
"id": user.id,
|
|
"friendcode": user.friendcode,
|
|
"subs": user.subs,
|
|
}
|
|
if check_password(request.json['password'], user.password):
|
|
return data, 200
|
|
return Response(status=500)
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
user = Users.query.filter_by(username=request.form.get("username")).first()
|
|
if check_password(request.form.get("password"), user.password):
|
|
login_user(user)
|
|
return redirect(url_for("home"))
|
|
return render_template("login.html")
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("home"))
|
|
|
|
@app.route("/signup", methods=["GET", "POST"])
|
|
def signup():
|
|
if request.method == "POST":
|
|
if request.form.get('password') == request.form.get('confirm_password'):
|
|
user = Users(
|
|
username=request.form.get('username'),
|
|
email=request.form.get("email"),
|
|
password=password_hash(request.form.get("password")),
|
|
firstname=request.form.get('firstname'),
|
|
lastname=request.form.get('lastname'),
|
|
friendcode=friend_code(6),
|
|
subs=[],
|
|
raw_avatar='default',
|
|
s_avatar='default',
|
|
m_avatar='default',
|
|
l_avatar='default'
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return redirect(url_for("login"))
|
|
else:
|
|
return redirect(request.url)
|
|
return render_template("signup.html")
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return render_template("home.html")
|
|
|
|
@app.route("/about")
|
|
def about():
|
|
return render_template("about.html")
|
|
|
|
was_upload = (False, "")
|
|
|
|
@app.route("/collections/upload", methods=["POST"])
|
|
def upload():
|
|
if request.method == "POST":
|
|
if 'file' not in request.files:
|
|
print('no file')
|
|
flash('No file part')
|
|
return redirect(url_for('collections'))
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
print("no selected file")
|
|
flash('No selected file')
|
|
return redirect(url_for('collections'))
|
|
if file:
|
|
filename= secure_filename(file.filename)
|
|
global was_upload
|
|
was_upload = (True, filename)
|
|
print(filename)
|
|
path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(path)
|
|
|
|
date = datetime.date.today()
|
|
|
|
upload = Uploads(
|
|
userid=current_user.id,
|
|
upload_date = date,
|
|
filename=filename,
|
|
author=current_user.username,
|
|
version="test",
|
|
description="",
|
|
path=path
|
|
)
|
|
db.session.add(upload)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('collections'))
|
|
|
|
def generate_file_chunks(filename):
|
|
with open(filename, 'rb') as file:
|
|
while True:
|
|
chunk = file.read(4096) # Read 4096 bytes at a time
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
|
|
@app.route("/download_app/latest")
|
|
def download_app():
|
|
path = pathlib.Path(f"static/versions/latest.zip")
|
|
filename = "Collection Sharing App.zip"
|
|
return send_file(open(path, "rb"), download_name=filename)
|
|
|
|
@app.route("/collections/download/<id>")
|
|
def download(id):
|
|
upload = Uploads.query.filter_by(id=id).first()
|
|
abs_path = os.path.join(current_app.root_path, str(upload.path).replace('/', '\\'))
|
|
print(abs_path)
|
|
if not os.path.isfile(abs_path):
|
|
print("File not found!")
|
|
try:
|
|
with open(abs_path, "rb") as f:
|
|
print("File read success, first 10 bytes:", f.read(10))
|
|
except Exception as e:
|
|
print("Error reading file:", e)
|
|
filename = upload.filename
|
|
return send_file(abs_path, as_attachment=True, download_name=filename)
|
|
|
|
|
|
@app.route("/collections/delete/<id>", methods=["GET", "POST"])
|
|
def delete_collection(id):
|
|
upload = Uploads.query.filter_by(id=id).first()
|
|
os.remove(upload.path)
|
|
row_to_delete = db.session.query(Uploads).filter(Uploads.id == id).one()
|
|
db.session.delete(row_to_delete)
|
|
db.session.commit()
|
|
return redirect(url_for('collections'))
|
|
|
|
|
|
@app.route("/collections/update/<id>", methods=["GET", "POST"])
|
|
def update_collection(id):
|
|
print(f"this is an update call for id:{id}")
|
|
if request.method == "POST":
|
|
print(f"this is an update call for id:{id}, request is a POST")
|
|
upload = Uploads.query.filter_by(id=id).first()
|
|
upload.author = request.form.get(f"author-{id}")
|
|
upload.version = request.form.get(f"version-{id}")
|
|
upload.description = request.form.get(f"description-{id}")
|
|
print(upload.description)
|
|
db.session.commit()
|
|
if request.form.get(f"latest-{id}") == "on":
|
|
user = Users.query.filter_by(id=current_user.id).first()
|
|
user.latest = upload.id
|
|
db.session.commit()
|
|
return redirect(url_for('collections'))
|
|
return redirect(url_for('collections'))
|
|
|
|
|
|
@app.route("/collections")
|
|
def collections():
|
|
page = request.args.get('page', 1, type=int)
|
|
pagination = Uploads.query.filter_by(userid=current_user.id).order_by(Uploads.upload_date)
|
|
pagination = pagination.paginate(page=page, per_page=5, error_out=False)
|
|
data=[]
|
|
friends_data = []
|
|
for code in current_user.subs:
|
|
try:
|
|
user = Users.query.filter_by(friendcode=code).first()
|
|
upload = Uploads.query.filter_by(id=user.latest).first()
|
|
up_data = {
|
|
"id": upload.id,
|
|
"upload_date": upload.upload_date,
|
|
"name": upload.filename,
|
|
"author": upload.author,
|
|
"version": upload.version,
|
|
"description": upload.description,
|
|
"path": pathlib.Path(f"{upload.path}").absolute()
|
|
}
|
|
friends_data.append(up_data)
|
|
except:
|
|
pass
|
|
|
|
global was_upload
|
|
data=[pagination, friends_data, deepcopy(was_upload)]
|
|
was_upload = (False, "")
|
|
return render_template("collections.html", data=data)
|
|
|
|
@app.route('/profile')
|
|
def profile():
|
|
return render_template("profile.html")
|
|
|
|
@app.route('/profile/upload', methods=["POST"])
|
|
def upload_avatar():
|
|
if request.method == 'POST':
|
|
f = request.files.get('file')
|
|
print(f)
|
|
raw_filename = avatars.save_avatar(f)
|
|
user = Users.query.filter_by(id=current_user.id).first()
|
|
user.raw_avatar = f"{app.config['AVATARS_SAVE_PATH']}{raw_filename}"
|
|
db.session.commit()
|
|
session['raw_filename'] = raw_filename # you will need to store this filename in database in reality
|
|
return redirect(url_for('crop'))
|
|
return redirect(url_for('profile'))
|
|
|
|
# serve avatar image
|
|
@app.route('/avatars/<path:filename>')
|
|
def get_avatar(filename):
|
|
return send_from_directory(app.config['AVATARS_SAVE_PATH'], filename)
|
|
|
|
@app.route('/crop', methods=['GET', 'POST'])
|
|
def crop():
|
|
if request.method == 'POST':
|
|
x = request.form.get('x')
|
|
y = request.form.get('y')
|
|
w = request.form.get('w')
|
|
h = request.form.get('h')
|
|
filenames = avatars.crop_avatar(session['raw_filename'], x, y, w, h)
|
|
user = Users.query.filter_by(id=current_user.id).first()
|
|
user.s_avatar = f"avatars/{filenames[0]}"
|
|
print(user.s_avatar)
|
|
db.session.commit()
|
|
user.m_avatar = f"avatars/{filenames[1]}"
|
|
db.session.commit()
|
|
user.l_avatar = f"avatars/{filenames[2]}"
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('profile'))
|
|
return render_template('crop.html')
|
|
|
|
@app.route('/profile/friends')
|
|
def friends_code():
|
|
data = []
|
|
for code in current_user.subs:
|
|
try:
|
|
user = Users.query.filter_by(friendcode=code).first()
|
|
|
|
x = {
|
|
'username': user.username,
|
|
'friendcode': user.friendcode,
|
|
'm_avatar': user.m_avatar
|
|
}
|
|
|
|
data.append(x)
|
|
except:
|
|
pass
|
|
return render_template("friends.html", data=data)
|
|
|
|
|
|
@app.route("/addcode", methods=["POST"])
|
|
def addcode():
|
|
if request.method == "POST":
|
|
if request.form.get("friendcode") == "":
|
|
return redirect(url_for('friends_code'))
|
|
user = Users.query.filter_by(id=current_user.id).first()
|
|
user.subs.append(request.form.get("friendcode"))
|
|
user.subs = user.subs
|
|
db.session.commit()
|
|
return redirect(url_for('friends_code'))
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port="5003", debug=True) |