Join GitHub today
GitHub is home to over 31 million developers working together to host and review code, manage projects, and build software together.
Sign up
Pure-L0G1C DRYing code1e8a7a9 on 7 Feb
1 contributor
# Date: 07/02/2018 | |
# Author: Pure-L0G1C | |
# Description: DBMS | |
import bcrypt | |
import sqlite3 | |
from . import const | |
from time import time | |
from os import urandom | |
from hashlib import sha256 | |
from base64 import b64encode | |
from datetime import datetime | |
class Database(object): | |
def __init__(self): | |
self.db_path = const.DATABASE | |
self.create_tables() | |
self.create_default_account() | |
def create_tables(self): | |
self.db_create(''' | |
CREATE TABLE IF NOT EXISTS | |
Account( | |
user_id TEXT PRIMARY KEY, | |
username TEXT, | |
password TEXT | |
); | |
''') | |
self.db_create(''' | |
CREATE TABLE IF NOT EXISTS | |
Status( | |
last_online INTEGER, | |
date_created INTEGER, | |
stat_id TEXT NOT NULL, | |
FOREIGN KEY(stat_id) REFERENCES Account(user_id) | |
); | |
''') | |
self.db_create(''' | |
CREATE TABLE IF NOT EXISTS | |
Attempt( | |
last_attempt INTEGER, | |
attempts_made INTEGER, | |
ampt_id TEXT NOT NULL, | |
FOREIGN KEY(ampt_id) REFERENCES Account(user_id) | |
); | |
''') | |
self.db_create(''' | |
CREATE TABLE IF NOT EXISTS | |
Lock( | |
time_locked INTEGER DEFAULT 0, | |
lock_id TEXT NOT NULL, | |
FOREIGN KEY(lock_id) REFERENCES Account(user_id) | |
); | |
''') | |
def add_account(self, username, password): | |
username = username.lower() | |
user_id = self.gen_user_id(username, password) | |
hashed_password = self.hash_password(password) | |
self.db_update(''' | |
INSERT INTO Account(user_id, username, password) | |
VALUES(?, ?, ?); | |
''', [user_id, username, hashed_password]) | |
self.db_update(''' | |
INSERT INTO Status(last_online, date_created, stat_id) | |
VALUES(?, ?, ?); | |
''', [time(), time(), user_id]) | |
self.db_update(''' | |
INSERT INTO Attempt(last_attempt, attempts_made, ampt_id) | |
VALUES(?, ?, ?); | |
''', [time(), 0, user_id]) | |
self.db_update(''' | |
INSERT INTO Lock(lock_id) | |
VALUES(?); | |
''', [user_id]) | |
def account_exists(self, username): | |
data = self.db_query('SELECT * FROM Account WHERE username=?;', [username], False) | |
return True if len(data) else False | |
def compare_passwords(self, user_id, password): | |
hashed_password = self.db_query('SELECT password FROM Account WHERE user_id=?;', [user_id]) | |
return True if bcrypt.hashpw(password.encode('utf-8'), hashed_password) == hashed_password else False | |
def check_password(self, username, password): | |
hashed_password = self.db_query('SELECT password FROM Account WHERE username=?;', [username]) | |
return True if bcrypt.hashpw(password.encode('utf-8'), hashed_password) == hashed_password else False | |
def authenticate(self, username, password): | |
username = username.lower() | |
if self.account_exists(username): | |
user_id = self.get_user_id(username) | |
if not self.is_locked(user_id): | |
if self.check_password(username, password): | |
return self.get_user_id(username) | |
else: | |
self.failed_attempt(user_id) | |
return None | |
def is_empty(self): | |
data = self.db_query('SELECT * FROM Account;', [], False) | |
return False if len(data) else True | |
def create_default_account(self): | |
if self.is_empty(): | |
self.add_account('loki', 'ikol') | |
# -------- Attempts -------- # | |
def lock_account(self, user_id): | |
self.db_update('UPDATE Lock SET time_locked=? WHERE lock_id=?;', [time(), user_id]) | |
def failed_attempt(self, user_id): | |
current_value = self.failed_attempts_counts(user_id) | |
if current_value >= const.MAX_FAILED_ATTEMPTS-1: | |
if not self.is_locked(user_id): | |
self.lock_account(user_id) | |
else: | |
self.db_update('UPDATE Attempt SET attempts_made=? WHERE ampt_id=?;', [current_value + 1, user_id]) | |
def failed_attempts_counts(self, user_id): | |
return self.db_query('SELECT attempts_made FROM Attempt WHERE ampt_id=?;', [user_id]) | |
def is_locked(self, user_id): | |
time_locked = self.locked(user_id) | |
if time_locked: | |
if (time() - time_locked) >= const.LOCK_TIME: | |
self.remove_locked_account(user_id) | |
return False | |
else: | |
return True | |
else: | |
return False | |
def locked(self, user_id): | |
return self.db_query(''' | |
SELECT time_locked | |
FROM Lock | |
INNER JOIN Account ON account.user_id = Lock.lock_id | |
WHERE Lock.lock_id=?; | |
''', [user_id]) | |
def remove_locked_account(self, user_id): | |
self.db_update('UPDATE Attempt SET attempts_made=? WHERE ampt_id=?;', [0, user_id]) | |
# -------- Database Wrappers -------- # | |
def db_query(self, cmd, args, fetchone=True): | |
database = sqlite3.connect(self.db_path) | |
sql = database.cursor().execute(cmd, args) | |
data = sql.fetchone()[0] if fetchone else sql.fetchall() | |
database.close() | |
return data | |
def db_update(self, cmd, args): | |
database = sqlite3.connect(self.db_path) | |
database.cursor().execute(cmd, args) | |
database.commit() | |
database.close() | |
def db_create(self, cmd): | |
database = sqlite3.connect(self.db_path) | |
database.cursor().execute(cmd) | |
database.commit() | |
database.close() | |
# -------- Update -------- # | |
def update_password(self, user_id, password): | |
hashed_password = self.hash_password(password) | |
self.db_update('UPDATE Account SET password=? WHERE user_id=?;', [hashed_password, user_id]) | |
def update_username(self, user_id, username): | |
self.db_update('UPDATE Account SET username=? WHERE user_id=?;', [username.lower(), user_id]) | |
# -------- Misc -------- # | |
def hash_password(self, password): | |
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
def gen_user_id(self, username, password): | |
_username = username.encode('utf-8') + urandom(64 * 1024) | |
_password = password.encode('utf-8') + urandom(64 * 1024) | |
_username_password = b64encode(_username + _password + urandom(64 * 64)) | |
secure_hash = sha256(_username_password).digest().hex() | |
return secure_hash | |
def get_date_created(self, user_id): | |
return self.db_query('SELECT date_created FROM Status WHERE stat_id=?;', [user_id]) | |
def get_user_id(self, username): | |
return self.db_query('SELECT user_id FROM Account WHERE username=?;', [username]) | |
def get_last_active(self, user_id): | |
epoch_time = self.db_query('SELECT last_online FROM Status WHERE stat_id=?;', [user_id]) | |
self.db_update('UPDATE Status SET last_online=? WHERE stat_id=?;', [time(), user_id]) | |
return datetime.fromtimestamp(epoch_time).strftime('%b %d, %Y at %I:%M %p') | |
def get_account_status(self, user_id, username): | |
default_username = 'loki' | |
default_password = 'ikol' | |
username = username.lower() | |
is_same_password = self.compare_passwords(user_id, default_password) | |
if all([username == default_username, is_same_password]): | |
status = '** Please consider changing your username and password **' | |
elif username == default_username: | |
status = '** Please consider changing your username **' | |
elif is_same_password: | |
status = '** Please consider changing your passsword **' | |
else: | |
status = None | |
return status |
No comments:
Post a Comment