155 lines
4.4 KiB
Python
155 lines
4.4 KiB
Python
import bcrypt
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
import sqlite3
|
|
|
|
|
|
@dataclass
|
|
class User:
|
|
user_id: int
|
|
email: str
|
|
handle: str
|
|
password: str
|
|
display_name: str
|
|
|
|
def __init__(
|
|
self,
|
|
user_id: int,
|
|
email: str,
|
|
handle: str,
|
|
password: str = "",
|
|
display_name: str = "",
|
|
) -> None:
|
|
self.user_id: int = user_id
|
|
self.handle = handle
|
|
self.password = password
|
|
self.display_name = display_name if display_name else handle
|
|
self.email = email
|
|
|
|
|
|
class UserHandler:
|
|
connection: sqlite3.Connection = sqlite3.connect("../data/database.db", check_same_thread=False)
|
|
cursor: sqlite3.Cursor = connection.cursor()
|
|
|
|
@classmethod
|
|
def create_user(
|
|
cls,
|
|
user: User
|
|
) -> None:
|
|
salt = bcrypt.gensalt()
|
|
encrypted_password = bcrypt.hashpw(user.password.encode("utf-8"), salt)
|
|
cls.cursor.execute(
|
|
f"""
|
|
INSERT INTO users(handle, display_name, email, password)
|
|
values(?, ?, ?, ?)
|
|
""",
|
|
(user.handle, user.display_name, user.email, encrypted_password)
|
|
)
|
|
cls.connection.commit()
|
|
shutil.copy(
|
|
"../assets/profile_pictures/default_profile_picture.jpg",
|
|
f"../assets/profile_pictures/{user.handle}"
|
|
)
|
|
|
|
@classmethod
|
|
def get_user_by_id(cls, user_id: int) -> User:
|
|
users = cls.cursor.execute(
|
|
f"""
|
|
SELECT handle, display_name
|
|
FROM users WHERE user_id = ?;
|
|
""",
|
|
(user_id, )
|
|
).fetchall()
|
|
if not users:
|
|
raise KeyError(f"{user_id} does not exist!")
|
|
user = users[0]
|
|
return User(
|
|
user_id=user_id,
|
|
handle=user[0],
|
|
display_name=user[1],
|
|
email="",
|
|
password=""
|
|
)
|
|
|
|
@classmethod
|
|
def get_user_by_handle(cls, handle: str) -> User:
|
|
users = cls.cursor.execute(
|
|
f"""
|
|
SELECT user_id, handle, display_name
|
|
FROM users WHERE handle = ?;
|
|
""",
|
|
(handle, )
|
|
).fetchall()
|
|
if not users:
|
|
raise KeyError(f"{handle} does not exist!")
|
|
users = users[0]
|
|
return User(
|
|
user_id=users[0],
|
|
handle=users[1],
|
|
display_name=users[2],
|
|
email="",
|
|
password=""
|
|
)
|
|
|
|
@classmethod
|
|
def get_user_by_email(cls, email: str) -> User:
|
|
users = cls.cursor.execute(
|
|
f"""
|
|
SELECT user_id, handle, display_name
|
|
FROM users WHERE email = ?;
|
|
""",
|
|
(email, )
|
|
).fetchall()
|
|
if not users:
|
|
raise KeyError(f"{email} does not exist!")
|
|
users = users[0]
|
|
return User(
|
|
user_id=users[0],
|
|
handle=users[1],
|
|
display_name=users[2],
|
|
email=email,
|
|
password="",
|
|
)
|
|
|
|
#@classmethod
|
|
#def verify_password_by_handle(cls, handle: str, password: str) -> bool:
|
|
# users = cls.cursor.execute(
|
|
# f"""
|
|
# SELECT password
|
|
# FROM users WHERE handle = ?;
|
|
# """,
|
|
# (handle, )
|
|
# ).fetchall()
|
|
# if not users:
|
|
# raise KeyError(f"{handle} does not exist!")
|
|
# real_password = users[0][0]
|
|
# return bcrypt.checkpw(password.encode("utf-8"), real_password)
|
|
|
|
@classmethod
|
|
def verify_password_by_email(cls, email: str, password: str) -> bool:
|
|
users = cls.cursor.execute(
|
|
f"""
|
|
SELECT password
|
|
FROM users WHERE email = ?;
|
|
""",
|
|
(email, )
|
|
).fetchall()
|
|
if not users:
|
|
raise KeyError(f"{email} does not exist!")
|
|
return bcrypt.checkpw(password.encode("utf-8"), users[0][0])
|
|
|
|
@classmethod
|
|
def change_display_name(cls, user_id: int, display_name: str):
|
|
cls.cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET display_name = ?
|
|
WHERE user_id = ?;
|
|
""",
|
|
(display_name, user_id)
|
|
).fetchall()
|
|
|
|
@classmethod
|
|
def get_display_name(cls, user_id: int) -> str:
|
|
return cls.get_user_by_id(user_id).display_name
|