import bcrypt import shutil from dataclasses import dataclass import sqlite3 @dataclass class User: user_id: int email: str handle: str password: str display_name: str def __init__( self, user_id: int, email: str, handle: str, password: str = "", display_name: str = "", ) -> None: self.user_id: int = user_id self.handle = handle self.password = password self.display_name = display_name if display_name else handle self.email = email class UserHandler: connection: sqlite3.Connection = sqlite3.connect("../data/database.db", check_same_thread=False) cursor: sqlite3.Cursor = connection.cursor() @classmethod def create_user( cls, user: User ) -> None: salt = bcrypt.gensalt() encrypted_password = bcrypt.hashpw(user.password.encode("utf-8"), salt) cls.cursor.execute( f""" INSERT INTO users(handle, display_name, email, password) values(?, ?, ?, ?) """, (user.handle, user.display_name, user.email, encrypted_password) ) cls.connection.commit() shutil.copy( "../assets/profile_pictures/default_profile_picture.jpg", f"../assets/profile_pictures/{user.handle}" ) @classmethod def get_user_by_id(cls, user_id: int) -> User: users = cls.cursor.execute( f""" SELECT handle, display_name FROM users WHERE user_id = ?; """, (user_id, ) ).fetchall() if not users: raise KeyError(f"{user_id} does not exist!") user = users[0] return User( user_id=user_id, handle=user[0], display_name=user[1], email="", password="" ) @classmethod def get_user_by_handle(cls, handle: str) -> User: users = cls.cursor.execute( f""" SELECT user_id, handle, display_name FROM users WHERE handle = ?; """, (handle, ) ).fetchall() if not users: raise KeyError(f"{handle} does not exist!") users = users[0] return User( user_id=users[0], handle=users[1], display_name=users[2], email="", password="" ) @classmethod def get_user_by_email(cls, email: str) -> User: users = cls.cursor.execute( f""" SELECT user_id, handle, display_name FROM users WHERE email = ?; """, (email, ) ).fetchall() if not users: raise KeyError(f"{email} does not exist!") users = users[0] return User( user_id=users[0], handle=users[1], display_name=users[2], email=email, password="", ) #@classmethod #def verify_password_by_handle(cls, handle: str, password: str) -> bool: # users = cls.cursor.execute( # f""" # SELECT password # FROM users WHERE handle = ?; # """, # (handle, ) # ).fetchall() # if not users: # raise KeyError(f"{handle} does not exist!") # real_password = users[0][0] # return bcrypt.checkpw(password.encode("utf-8"), real_password) @classmethod def verify_password_by_email(cls, email: str, password: str) -> bool: users = cls.cursor.execute( f""" SELECT password FROM users WHERE email = ?; """, (email, ) ).fetchall() if not users: raise KeyError(f"{email} does not exist!") return bcrypt.checkpw(password.encode("utf-8"), users[0][0]) @classmethod def change_display_name(cls, user_id: int, display_name: str): cls.cursor.execute( f""" UPDATE users SET display_name = ? WHERE user_id = ?; """, (display_name, user_id) ).fetchall() @classmethod def get_display_name(cls, user_id: int) -> str: return cls.get_user_by_id(user_id).display_name