from flask import Flask, Request, render_template, request, redirect, url_for, flash, send_file, send_from_directory, Response, session, jsonify, current_app from flask_avatars import Avatars import os, pathlib, io from flask_sqlalchemy import SQLAlchemy from sqlalchemy.ext.mutable import MutableList, MutableDict from flask_login import LoginManager, current_user, login_user, logout_user, UserMixin, login_required from werkzeug.utils import secure_filename import random, string, bcrypt, datetime from copy import deepcopy from dataclasses import dataclass from PIL import Image import base64, json, pathlib app = Flask(__name__) avatars = Avatars(app) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///db.sqlite" app.config["SECRET_KEY"] = "MYSUPERCOOLAPPKEY%#!%#!GJDAJVNEIALDOFJGNA" app.config["UPLOAD_FOLDER"] = "media" app.config['SECURITY_PASSWORD_HASH'] = 'bcrypt' app.config['SECURITY_PASSWORD_SALT'] = b'$2b$12$wqKlYjmOfXPghx3FuC3Pu' app.config['AVATARS_SAVE_PATH'] = 'static/avatars/' db = SQLAlchemy() login_manager = LoginManager() login_manager.init_app(app) @dataclass class Users(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(250), unique=True, nullable=False) email = db.Column(db.String(250), unique=True, nullable=False) password = db.Column(db.String(250), nullable=False) firstname = db.Column(db.String(250), nullable=True) lastname = db.Column(db.String(250), nullable=True) friendcode = db.Column(db.String(250), nullable=False) subs = db.Column(MutableList.as_mutable(db.JSON), nullable=False) latest = db.Column(db.Integer, nullable=True) raw_avatar = db.Column(db.String(250), nullable=False) s_avatar = db.Column(db.String(250), nullable=False) m_avatar = db.Column(db.String(250), nullable=False) l_avatar = db.Column(db.String(250), nullable=False) class Uploads(db.Model): id = db.Column(db.Integer, primary_key=True) userid = db.Column(db.Integer, nullable=False) upload_date = db.Column(db.String(250), nullable=False) filename = db.Column(db.String(250), nullable=False) author = db.Column(db.String(250), nullable=False) version = db.Column(db.String(250), nullable=False) description = db.Column(db.String(250), nullable=True) meta_data = db.Column(MutableDict.as_mutable(db.JSON), nullable=True) collection_json = db.Column(MutableDict.as_mutable(db.JSON), nullable=True) character_links = db.Column(MutableDict.as_mutable(db.JSON), nullable=True) mod_list = db.Column(MutableList.as_mutable(db.JSON), nullable=True) path = db.Column(db.String(250), nullable=False) db.init_app(app) with app.app_context(): db.create_all() def friend_code(length): letters = string.ascii_letters return ''.join(random.choice(letters) for i in range(length)) @login_manager.user_loader def loader_user(user_id): return Users.query.get(user_id) def password_hash(input_password): bytes = input_password.encode('utf-8') salt = bcrypt.gensalt() return bcrypt.hashpw(bytes, salt) def check_password(input_password, hash): bytes = input_password.encode('utf-8') result = bcrypt.checkpw(bytes, hash) return result @app.route("/login_app/avatar", methods=["POST"]) def login_request_avatar(): if request.method == "POST": username = request.json['username'] user = Users.query.filter_by(username=username).first() if user.s_avatar != "default": path = f"static/{user.s_avatar}" filename = f"{user.username}.png" return send_file(open(path, "rb"), download_name=filename) return Response(status=201) @app.route('/check_server') def ping(): return Response(status=200) @app.route('/update_app//') def update_app(pipe, version): current_latest = "alpha_v1.0" if pipe == "latest": if not current_latest == version: update_ready = True else: update_ready = False data = { 'latest_version': current_latest, 'update_ready': update_ready, 'download_url': f"/download_app/{pipe}" } return jsonify(data=data) return Response(status=200) @app.route("/upload_file", methods=["POST"]) def save_chunks(): if 'file' in request.files: uploaded_file = request.files['file'] filename = uploaded_file.filename with open(f'{app.config['UPLOAD_FOLDER']}/{filename}', 'ab') as f: f.write(uploaded_file.read()) return '', 200 return 404 @app.route("/upload_info", methods=['POST']) async def upload_info(): if not request.method == 'POST': return Response(status=415) data = request.json user = Users.query.filter_by(username=data['username']).first() if not check_password(data['password'], user.password): return Response(status=415) filename = data['filename'] # database upload date = datetime.date.today() save_path = f"{app.config['UPLOAD_FOLDER']}/{filename}" meta_data = data['meta_data'] collection_json = data['collection_json'] character_links = data['character_links'] mod_list = data['mod_list'] print(meta_data) upload = Uploads( userid=user.id, upload_date = date, filename=filename, author=user.username, version="test", description="", meta_data = meta_data, collection_json = collection_json, character_links = character_links, mod_list = mod_list, path=save_path ) db.session.add(upload) db.session.commit() return Response(status=200) @app.route("/upload_app", methods=['POST']) async def upload_app(): data = request.files['datas'].read() data = json.loads(data) if not request.method == 'POST': return Response(status=415) user = Users.query.filter_by(username=data['username']).first() if not check_password(data['password'], user.password): return Response(status=415) if 'file' not in request.files: print(request.files) return Response(status=415) file = request.files['file'] if file.filename == '': return Response(status=415) filename = data['filename'] filename = secure_filename(filename) # database upload date = datetime.date.today() save_path = f"{app.config['UPLOAD_FOLDER']}/{filename}" meta_data = data['meta_data'] collection_json = data['collection_json'] character_links = data['character_links'] mod_list = data['mods_to_copy'] print(meta_data) upload = Uploads( userid=user.id, upload_date = date, filename=filename, author=user.firstname, version="test", description="", meta_data = meta_data, collection_json = collection_json, character_links = character_links, mod_list = mod_list, path=save_path ) db.session.add(upload) db.session.commit() # saves file print(save_path) file.save(save_path) return Response(status=200) @app.route("/login_app", methods=["POST"]) def login_app(): if request.method == "POST": username = request.json['username'] user = Users.query.filter_by(username=username).first() data= { "id": user.id, "friendcode": user.friendcode, "subs": user.subs, } if check_password(request.json['password'], user.password): return data, 200 return Response(status=500) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": user = Users.query.filter_by(username=request.form.get("username")).first() if check_password(request.form.get("password"), user.password): login_user(user) return redirect(url_for("home")) return render_template("login.html") @app.route("/logout") def logout(): logout_user() return redirect(url_for("home")) @app.route("/signup", methods=["GET", "POST"]) def signup(): if request.method == "POST": if request.form.get('password') == request.form.get('confirm_password'): user = Users( username=request.form.get('username'), email=request.form.get("email"), password=password_hash(request.form.get("password")), firstname=request.form.get('firstname'), lastname=request.form.get('lastname'), friendcode=friend_code(6), subs=[], raw_avatar='default', s_avatar='default', m_avatar='default', l_avatar='default' ) db.session.add(user) db.session.commit() return redirect(url_for("login")) else: return redirect(request.url) return render_template("signup.html") @app.route("/") def home(): return render_template("home.html") @app.route("/about") def about(): return render_template("about.html") was_upload = (False, "") @app.route("/collections/upload", methods=["POST"]) def upload(): if request.method == "POST": if 'file' not in request.files: print('no file') flash('No file part') return redirect(url_for('collections')) file = request.files['file'] if file.filename == '': print("no selected file") flash('No selected file') return redirect(url_for('collections')) if file: filename= secure_filename(file.filename) global was_upload was_upload = (True, filename) print(filename) path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(path) date = datetime.date.today() upload = Uploads( userid=current_user.id, upload_date = date, filename=filename, author=current_user.username, version="test", description="", path=path ) db.session.add(upload) db.session.commit() return redirect(url_for('collections')) def generate_file_chunks(filename): with open(filename, 'rb') as file: while True: chunk = file.read(4096) # Read 4096 bytes at a time if not chunk: break yield chunk @app.route("/download_app/latest") def download_app(): path = pathlib.Path(f"static/versions/latest.zip") filename = "Collection Sharing App.zip" return send_file(open(path, "rb"), download_name=filename) @app.route("/collections/download/") def download(id): upload = Uploads.query.filter_by(id=id).first() abs_path = os.path.join(current_app.root_path, str(upload.path).replace('/', '\\')) print(abs_path) if not os.path.isfile(abs_path): print("File not found!") try: with open(abs_path, "rb") as f: print("File read success, first 10 bytes:", f.read(10)) except Exception as e: print("Error reading file:", e) filename = upload.filename return send_file(abs_path, as_attachment=True, download_name=filename) @app.route("/collections/delete/", methods=["GET", "POST"]) def delete_collection(id): upload = Uploads.query.filter_by(id=id).first() os.remove(upload.path) row_to_delete = db.session.query(Uploads).filter(Uploads.id == id).one() db.session.delete(row_to_delete) db.session.commit() return redirect(url_for('collections')) @app.route("/collections/update/", methods=["GET", "POST"]) def update_collection(id): print(f"this is an update call for id:{id}") if request.method == "POST": print(f"this is an update call for id:{id}, request is a POST") upload = Uploads.query.filter_by(id=id).first() upload.author = request.form.get(f"author-{id}") upload.version = request.form.get(f"version-{id}") upload.description = request.form.get(f"description-{id}") print(upload.description) db.session.commit() if request.form.get(f"latest-{id}") == "on": user = Users.query.filter_by(id=current_user.id).first() user.latest = upload.id db.session.commit() return redirect(url_for('collections')) return redirect(url_for('collections')) @app.route("/collections") def collections(): page = request.args.get('page', 1, type=int) pagination = Uploads.query.filter_by(userid=current_user.id).order_by(Uploads.upload_date) pagination = pagination.paginate(page=page, per_page=5, error_out=False) data=[] friends_data = [] for code in current_user.subs: try: user = Users.query.filter_by(friendcode=code).first() upload = Uploads.query.filter_by(id=user.latest).first() up_data = { "id": upload.id, "upload_date": upload.upload_date, "name": upload.filename, "author": upload.author, "version": upload.version, "description": upload.description, "path": pathlib.Path(f"{upload.path}").absolute() } friends_data.append(up_data) except: pass global was_upload data=[pagination, friends_data, deepcopy(was_upload)] was_upload = (False, "") return render_template("collections.html", data=data) @app.route('/profile') def profile(): return render_template("profile.html") @app.route('/profile/upload', methods=["POST"]) def upload_avatar(): if request.method == 'POST': f = request.files.get('file') print(f) raw_filename = avatars.save_avatar(f) user = Users.query.filter_by(id=current_user.id).first() user.raw_avatar = f"{app.config['AVATARS_SAVE_PATH']}{raw_filename}" db.session.commit() session['raw_filename'] = raw_filename # you will need to store this filename in database in reality return redirect(url_for('crop')) return redirect(url_for('profile')) # serve avatar image @app.route('/avatars/') def get_avatar(filename): return send_from_directory(app.config['AVATARS_SAVE_PATH'], filename) @app.route('/crop', methods=['GET', 'POST']) def crop(): if request.method == 'POST': x = request.form.get('x') y = request.form.get('y') w = request.form.get('w') h = request.form.get('h') filenames = avatars.crop_avatar(session['raw_filename'], x, y, w, h) user = Users.query.filter_by(id=current_user.id).first() user.s_avatar = f"avatars/{filenames[0]}" print(user.s_avatar) db.session.commit() user.m_avatar = f"avatars/{filenames[1]}" db.session.commit() user.l_avatar = f"avatars/{filenames[2]}" db.session.commit() return redirect(url_for('profile')) return render_template('crop.html') @app.route('/profile/friends') def friends_code(): data = [] for code in current_user.subs: try: user = Users.query.filter_by(friendcode=code).first() x = { 'username': user.username, 'friendcode': user.friendcode, 'm_avatar': user.m_avatar } data.append(x) except: pass return render_template("friends.html", data=data) @app.route("/addcode", methods=["POST"]) def addcode(): if request.method == "POST": if request.form.get("friendcode") == "": return redirect(url_for('friends_code')) user = Users.query.filter_by(id=current_user.id).first() user.subs.append(request.form.get("friendcode")) user.subs = user.subs db.session.commit() return redirect(url_for('friends_code')) if __name__ == "__main__": app.run(host="0.0.0.0", port="5003", debug=True)