| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- from email.policy import default
- import os
- import re
- import bcrypt
- import dotenv
- from flask import (
- Flask,
- flash,
- redirect,
- render_template,
- request,
- session,
- url_for,
- )
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy.orm import DeclarativeBase
- from sqlalchemy.sql import func
- from sqlalchemy.exc import IntegrityError
- dotenv.load_dotenv()
- app = Flask(__name__)
- app.secret_key = os.getenv("SECRET_KEY", "whataloadofnonsense")
- class Base(DeclarativeBase):
- pass
- app.config["SQLALCHEMY_DATABASE_URI"] = (
- f"mysql://root:root@{os.getenv('DB_HOST', "db")}:3306/topperstasks"
- )
- db = SQLAlchemy(model_class=Base)
- db.init_app(app)
- class User(db.Model):
- __tablename__ = "users"
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String, unique=True, nullable=False)
- passhash = db.Column(db.String, nullable=False)
- is_admin = db.Column(db.Boolean, default=False)
- tasks = db.relationship("Task", back_populates="user")
- task_assignments = db.relationship("Task_Assignment", back_populates="user")
- class Task(db.Model):
- __tablename__ = "tasks"
- id = db.Column(db.Integer, primary_key=True)
- created_datetime = db.Column(db.String, nullable=False, default=func.now())
- tasktext = db.Column(db.String, nullable=False)
- created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
- due = db.Column(db.String)
- completed = db.Column(db.Boolean, default=False)
- deleted = db.Column(db.Boolean, default=False)
- user = db.relationship(User, back_populates="tasks")
- task_assignments = db.relationship("Task_Assignment", back_populates="task")
- class Task_Assignment(db.Model):
- __tablename__ = "task_assignments"
- id = db.Column(db.Integer, primary_key=True)
- user_id = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
- task_id = db.Column(db.Integer, db.ForeignKey(Task.id), nullable=False)
- user = db.relationship(User, back_populates="task_assignments")
- task = db.relationship(Task, back_populates="task_assignments")
- @app.route("/")
- def home(completed=False):
- if not session.get("userid"):
- return redirect(url_for("login"))
- select = (
- db.select(Task)
- .where(Task.deleted == False)
- .order_by(Task.created_datetime.desc())
- )
- if completed:
- select = select.where(Task.completed == True)
- else:
- select = select.where(Task.completed == False)
- tasks = db.session.execute(select).scalars().all()
- for task in tasks:
- task.assignees = ""
- assignees = (
- db.session.execute(
- db.select(User)
- .join(Task_Assignment)
- .where(Task_Assignment.task_id == task.id)
- )
- .scalars()
- .all()
- )
- for assignee in assignees:
- task.assignees += assignee.username + ", "
- task.assignees = task.assignees[:-2] if task.assignees else "[NONE]"
- users = db.session.execute(db.select(User)).scalars().all()
- return render_template(
- "home.html", tasks=tasks, completed=completed, users=users
- )
- @app.route("/completed")
- def completed():
- return home(completed=True)
- @app.route("/account", methods=["GET", "POST"])
- def account():
- if not session.get("userid"):
- return redirect(url_for("login"))
- user = db.session.execute(
- db.select(User).where(User.id == session.get("userid"))
- ).scalar_one_or_none()
- if not user:
- return redirect(url_for("login"))
- if request.method == "POST":
- if newpassword := request.form.get("newpassword"):
- if not bcrypt.checkpw(
- request.form.get("oldpassword").encode(), user.passhash.encode()
- ):
- flash("Incorrect Password", category="warning")
- return redirect(url_for("account"))
- salt = bcrypt.gensalt()
- user.passhash = bcrypt.hashpw(newpassword.encode(), salt)
- db.session.commit()
- flash("Updated Password", category="success")
- return redirect(url_for("account"))
- if username := request.form.get("username"):
- user.username = username
- db.session.commit()
- flash("Updated Username", category="success")
- return redirect(url_for("account"))
- flash("Failed to update", category="warning")
- return redirect(url_for("account"))
- return render_template("account.html", user=user)
- @app.route("/login", methods=["GET", "POST"])
- def login():
- if request.method == "POST":
- try:
- username = request.form.get("user")
- password = request.form.get("password")
- user: User = db.session.execute(
- db.select(User).where(User.username == username)
- ).scalar_one_or_none()
- if not user:
- flash("Failed to login", category="warning")
- return redirect(url_for("login"))
- if not user.passhash:
- if not password:
- flash("Can't set a blank password")
- return redirect(url_for("login"))
- salt = bcrypt.gensalt()
- user.passhash = bcrypt.hashpw(password.encode(), salt)
- db.session.commit()
- flash("Password set succesfully!", category="success")
- return redirect(url_for("login"))
- if bcrypt.checkpw(password.encode(), user.passhash.encode()):
- session["userid"] = user.id
- return redirect(url_for("home"))
- else:
- flash("Failed to login", category="warning")
- return redirect(url_for("login"))
- except:
- flash("Critical Error, contact Peter pls", category="danger")
- return redirect(url_for("login"))
- return render_template("login.html")
- @app.route("/addtask", methods=["POST"])
- def addtask():
- if not session.get("userid"):
- return redirect(url_for("login"))
- try:
- tasktext = request.form.get("task-text")
- user_id = session.get("userid")
- if not tasktext:
- flash("Invalid task text", category="warning")
- return redirect(url_for("home"))
- if not user_id:
- return redirect(url_for("login"))
- newtask = Task(tasktext=tasktext, created_by=user_id)
- db.session.add(newtask)
- db.session.commit()
- except:
- flash("Critical Error, contact Peter pls", category="danger")
- return redirect(url_for("home"))
- return redirect(url_for("home"))
- @app.route("/updatetask", methods=["POST"])
- def updatetask():
- if not session.get("userid"):
- return redirect(url_for("login"))
- if "task-id" not in request.form:
- flash("Failed to find task", category="warning")
- return redirect(url_for("home"))
- task = db.session.execute(
- db.select(Task).where(Task.id == request.form.get("task-id"))
- ).scalar_one_or_none()
- if not task:
- flash("Failed to find task", category="warning")
- return redirect(url_for("home"))
- if "due" in request.form:
- due = request.form.get("due")
- if not due:
- flash("Invalid due date", category="warning")
- return redirect(url_for("home"))
- datetime_str = due
- datetime_str = datetime_str.replace("T", " ")
- datetime_str += ":00"
- app.logger.info(datetime_str)
- task.due = datetime_str
- if "assignee" in request.form:
- assignee = request.form.get("assignee")
- if not assignee:
- flash("Invalid assignee", category="warning")
- return redirect(url_for("home"))
- assignment = Task_Assignment(user_id=assignee, task_id=task.id)
- db.session.add(assignment)
- if "completed" in request.form:
- task.completed = not task.completed
- try:
- db.session.commit()
- except IntegrityError as e:
- flash("Database error, what did you just try to do?", category="danger")
- return redirect(url_for("home"))
- @app.route("/admin", methods=["GET", "POST"])
- def admin():
- if not session.get("userid"):
- return redirect(url_for("login"))
- user = db.session.execute(
- db.select(User).where(User.id == session.get("userid"))
- ).scalar_one_or_none()
- if not user or not user.is_admin:
- flash("You are not an admin!", category="danger")
- return redirect(url_for("home"))
- if request.method == "POST":
- if userid := request.form.get("userid-reset"):
- resetuser = db.session.execute(
- db.select(User).where(User.id == userid)
- ).scalar_one_or_none()
- resetuser.passhash = None
- db.session.commit()
- flash("Reset user's password succesfully", category="success")
- return redirect(url_for("admin"))
- if newusername := request.form.get("username-add"):
- newuser = User()
- newuser.username = newusername
- db.session.add(newuser)
- db.session.commit()
- flash("New user added successfully", category="success")
- return redirect(url_for("admin"))
- if newadminid := request.form.get("userid-admin"):
- newadmin = db.session.execute(
- db.select(User).where(User.id == newadminid)
- ).scalar_one_or_none()
- if newadmin.is_admin:
- flash("User is already an admin", category="warning")
- return redirect(url_for("admin"))
- newadmin.is_admin = True
- db.session.commit()
- flash("User is now an admin", category="success")
- return redirect(url_for("admin"))
- users = db.session.execute(db.select(User)).scalars().all()
- return render_template("admin.html", users=users)
- @app.route("/logout", methods=["POST"])
- def logout():
- session.pop("userid", None)
- return redirect(url_for("login"))
- if __name__ == "__main__":
- app.run(host="0.0.0.0", debug=True)
|