main.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. from email.policy import default
  2. import os
  3. import re
  4. import bcrypt
  5. import dotenv
  6. from flask import (
  7. Flask,
  8. flash,
  9. redirect,
  10. render_template,
  11. request,
  12. session,
  13. url_for,
  14. )
  15. from flask_sqlalchemy import SQLAlchemy
  16. from sqlalchemy.orm import DeclarativeBase
  17. from sqlalchemy.sql import func
  18. from sqlalchemy.exc import IntegrityError
  19. dotenv.load_dotenv()
  20. app = Flask(__name__)
  21. app.secret_key = os.getenv("SECRET_KEY", "whataloadofnonsense")
  22. class Base(DeclarativeBase):
  23. pass
  24. app.config["SQLALCHEMY_DATABASE_URI"] = (
  25. f"mysql://root:root@{os.getenv('DB_HOST', "db")}:3306/topperstasks"
  26. )
  27. db = SQLAlchemy(model_class=Base)
  28. db.init_app(app)
  29. class User(db.Model):
  30. __tablename__ = "users"
  31. id = db.Column(db.Integer, primary_key=True)
  32. username = db.Column(db.String, unique=True, nullable=False)
  33. passhash = db.Column(db.String, nullable=False)
  34. is_admin = db.Column(db.Boolean, default=False)
  35. tasks = db.relationship("Task", back_populates="user")
  36. task_assignments = db.relationship("Task_Assignment", back_populates="user")
  37. class Task(db.Model):
  38. __tablename__ = "tasks"
  39. id = db.Column(db.Integer, primary_key=True)
  40. created_datetime = db.Column(db.String, nullable=False, default=func.now())
  41. tasktext = db.Column(db.String, nullable=False)
  42. created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
  43. due = db.Column(db.String)
  44. completed = db.Column(db.Boolean, default=False)
  45. deleted = db.Column(db.Boolean, default=False)
  46. user = db.relationship(User, back_populates="tasks")
  47. task_assignments = db.relationship("Task_Assignment", back_populates="task")
  48. class Task_Assignment(db.Model):
  49. __tablename__ = "task_assignments"
  50. id = db.Column(db.Integer, primary_key=True)
  51. user_id = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
  52. task_id = db.Column(db.Integer, db.ForeignKey(Task.id), nullable=False)
  53. user = db.relationship(User, back_populates="task_assignments")
  54. task = db.relationship(Task, back_populates="task_assignments")
  55. @app.route("/")
  56. def home(completed=False):
  57. if not session.get("userid"):
  58. return redirect(url_for("login"))
  59. select = (
  60. db.select(Task)
  61. .where(Task.deleted == False)
  62. .order_by(Task.created_datetime.desc())
  63. )
  64. if completed:
  65. select = select.where(Task.completed == True)
  66. else:
  67. select = select.where(Task.completed == False)
  68. tasks = db.session.execute(select).scalars().all()
  69. for task in tasks:
  70. task.assignees = ""
  71. assignees = (
  72. db.session.execute(
  73. db.select(User)
  74. .join(Task_Assignment)
  75. .where(Task_Assignment.task_id == task.id)
  76. )
  77. .scalars()
  78. .all()
  79. )
  80. for assignee in assignees:
  81. task.assignees += assignee.username + ", "
  82. task.assignees = task.assignees[:-2] if task.assignees else "[NONE]"
  83. users = db.session.execute(db.select(User)).scalars().all()
  84. return render_template(
  85. "home.html", tasks=tasks, completed=completed, users=users
  86. )
  87. @app.route("/completed")
  88. def completed():
  89. return home(completed=True)
  90. @app.route("/account", methods=["GET", "POST"])
  91. def account():
  92. if not session.get("userid"):
  93. return redirect(url_for("login"))
  94. user = db.session.execute(
  95. db.select(User).where(User.id == session.get("userid"))
  96. ).scalar_one_or_none()
  97. if not user:
  98. return redirect(url_for("login"))
  99. if request.method == "POST":
  100. if newpassword := request.form.get("newpassword"):
  101. if not bcrypt.checkpw(
  102. request.form.get("oldpassword").encode(), user.passhash.encode()
  103. ):
  104. flash("Incorrect Password", category="warning")
  105. return redirect(url_for("account"))
  106. salt = bcrypt.gensalt()
  107. user.passhash = bcrypt.hashpw(newpassword.encode(), salt)
  108. db.session.commit()
  109. flash("Updated Password", category="success")
  110. return redirect(url_for("account"))
  111. if username := request.form.get("username"):
  112. user.username = username
  113. db.session.commit()
  114. flash("Updated Username", category="success")
  115. return redirect(url_for("account"))
  116. flash("Failed to update", category="warning")
  117. return redirect(url_for("account"))
  118. return render_template("account.html", user=user)
  119. @app.route("/login", methods=["GET", "POST"])
  120. def login():
  121. if request.method == "POST":
  122. try:
  123. username = request.form.get("user")
  124. password = request.form.get("password")
  125. user: User = db.session.execute(
  126. db.select(User).where(User.username == username)
  127. ).scalar_one_or_none()
  128. if not user:
  129. flash("Failed to login", category="warning")
  130. return redirect(url_for("login"))
  131. if not user.passhash:
  132. if not password:
  133. flash("Can't set a blank password")
  134. return redirect(url_for("login"))
  135. salt = bcrypt.gensalt()
  136. user.passhash = bcrypt.hashpw(password.encode(), salt)
  137. db.session.commit()
  138. flash("Password set succesfully!", category="success")
  139. return redirect(url_for("login"))
  140. if bcrypt.checkpw(password.encode(), user.passhash.encode()):
  141. session["userid"] = user.id
  142. return redirect(url_for("home"))
  143. else:
  144. flash("Failed to login", category="warning")
  145. return redirect(url_for("login"))
  146. except:
  147. flash("Critical Error, contact Peter pls", category="danger")
  148. return redirect(url_for("login"))
  149. return render_template("login.html")
  150. @app.route("/addtask", methods=["POST"])
  151. def addtask():
  152. if not session.get("userid"):
  153. return redirect(url_for("login"))
  154. try:
  155. tasktext = request.form.get("task-text")
  156. user_id = session.get("userid")
  157. if not tasktext:
  158. flash("Invalid task text", category="warning")
  159. return redirect(url_for("home"))
  160. if not user_id:
  161. return redirect(url_for("login"))
  162. newtask = Task(tasktext=tasktext, created_by=user_id)
  163. db.session.add(newtask)
  164. db.session.commit()
  165. except:
  166. flash("Critical Error, contact Peter pls", category="danger")
  167. return redirect(url_for("home"))
  168. return redirect(url_for("home"))
  169. @app.route("/updatetask", methods=["POST"])
  170. def updatetask():
  171. if not session.get("userid"):
  172. return redirect(url_for("login"))
  173. if "task-id" not in request.form:
  174. flash("Failed to find task", category="warning")
  175. return redirect(url_for("home"))
  176. task = db.session.execute(
  177. db.select(Task).where(Task.id == request.form.get("task-id"))
  178. ).scalar_one_or_none()
  179. if not task:
  180. flash("Failed to find task", category="warning")
  181. return redirect(url_for("home"))
  182. if "due" in request.form:
  183. due = request.form.get("due")
  184. if not due:
  185. flash("Invalid due date", category="warning")
  186. return redirect(url_for("home"))
  187. datetime_str = due
  188. datetime_str = datetime_str.replace("T", " ")
  189. datetime_str += ":00"
  190. app.logger.info(datetime_str)
  191. task.due = datetime_str
  192. if "assignee" in request.form:
  193. assignee = request.form.get("assignee")
  194. if not assignee:
  195. flash("Invalid assignee", category="warning")
  196. return redirect(url_for("home"))
  197. assignment = Task_Assignment(user_id=assignee, task_id=task.id)
  198. db.session.add(assignment)
  199. if "completed" in request.form:
  200. task.completed = not task.completed
  201. try:
  202. db.session.commit()
  203. except IntegrityError as e:
  204. flash("Database error, what did you just try to do?", category="danger")
  205. return redirect(url_for("home"))
  206. @app.route("/admin", methods=["GET", "POST"])
  207. def admin():
  208. if not session.get("userid"):
  209. return redirect(url_for("login"))
  210. user = db.session.execute(
  211. db.select(User).where(User.id == session.get("userid"))
  212. ).scalar_one_or_none()
  213. if not user or not user.is_admin:
  214. flash("You are not an admin!", category="danger")
  215. return redirect(url_for("home"))
  216. if request.method == "POST":
  217. if userid := request.form.get("userid-reset"):
  218. resetuser = db.session.execute(
  219. db.select(User).where(User.id == userid)
  220. ).scalar_one_or_none()
  221. resetuser.passhash = None
  222. db.session.commit()
  223. flash("Reset user's password succesfully", category="success")
  224. return redirect(url_for("admin"))
  225. if newusername := request.form.get("username-add"):
  226. newuser = User()
  227. newuser.username = newusername
  228. db.session.add(newuser)
  229. db.session.commit()
  230. flash("New user added successfully", category="success")
  231. return redirect(url_for("admin"))
  232. users = db.session.execute(db.select(User)).scalars().all()
  233. return render_template("admin.html", users=users)
  234. @app.route("/logout", methods=["POST"])
  235. def logout():
  236. session.pop("userid", None)
  237. return redirect(url_for("login"))
  238. if __name__ == "__main__":
  239. app.run(host="0.0.0.0", debug=True)