Raise exceptions instead of passing back values indicating errors. modified: listfix/db.py
Brian Barto bartobrian@gmail.com
Wed, 30 Mar 2022 11:07:36 -0400
1 files changed,
38 insertions(+),
97 deletions(-)
jump to
M
listfix/db.py
→
listfix/db.py
@@ -3,11 +3,7 @@
class DB: def __init__(self, path): - try: - self.db = sqlite3.connect(path) - except sqlite3.OperationalError: - print(f"Could not open database file {path}") - exit() + self.db = sqlite3.connect(path) self.check_tables() def check_tables(self):@@ -20,150 +16,95 @@ if (not row[0]):
self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)") self.db.commit() - return - def check_list_exists(self, list_email): - lists = self.get_lists() - if (list_email in lists): return True else: return False - def create_list(self, list_email, list_name): - if (self.check_list_exists(list_email)): - return False - - self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email]) - self.db.commit() - - return True - - def destroy_list(self, list_email): - list_id = self.get_list_id(list_email) - if (not list_id): - return False - - self.db.execute("DELETE FROM lists WHERE id = ?", [list_id]) - self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id]) - self.db.commit() - - return True - def get_lists(self): lists = [] - rows = self.db.execute("SELECT email FROM lists") - - if (rows.rowcount == 0): - return False - for row in rows: lists.append(row[0]) - return lists def get_list_id(self, list_email): row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() if (not row): - return None - else: - return row[0] + raise ValueError(f"Email list does not exist: {list_email}") + return row[0] def get_list_name(self, list_email): row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone() if (not row): - return None - else: - return row[0] + raise ValueError(f"Email list does not exist: {list_email}") + return row[0] def get_list_recipients(self, list_email): recipients = [] - list_id = self.get_list_id(list_email) - if (not list_id): - return None - rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id]) for row in rows: recipients.append(row[0]) - return recipients + def create_list(self, list_email, list_name): + if (self.check_list_exists(list_email)): + raise ValueError(f"Email list already exists: {list_email}") + self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email]) + self.db.commit() + + def destroy_list(self, list_email): + list_id = self.get_list_id(list_email) + self.db.execute("DELETE FROM lists WHERE id = ?", [list_id]) + self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id]) + self.db.commit() + def check_recipient_exists(self, list_email, recipient_email): recipients = self.get_list_recipients(list_email) - if (recipient_email in recipients): return True else: return False - def create_recipient(self, list_email, recipient_email, recipient_name): + def get_recipient_id(self, list_email, recipient_email): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + list_id = self.get_list_id(list_email) + row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() + if (not row): + raise ValueError(f"Recipient does not exist: {recipient_email}") + return row[0] + def get_recipient_name(self, list_email, recipient_email): if (not self.check_list_exists(list_email)): - return False + raise ValueError(f"Email list does not exist: {list_email}") + list_id = self.get_list_id(list_email) + row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() + if (not row): + raise ValueError(f"Recipient does not exist: {recipient_email}") + return row[0] + def create_recipient(self, list_email, recipient_email, recipient_name): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") if (self.check_recipient_exists(list_email, recipient_email)): - return False - + raise ValueError(f"Recipient already exists: {recipient_email}") list_id = self.get_list_id(list_email) - if (list_id == None): - return False - self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email]) self.db.commit() - return True - def destroy_recipient(self, list_email, recipient_email): - if (not self.check_list_exists(list_email)): - return False - + raise ValueError(f"Email list does not exist: {list_email}") if (not self.check_recipient_exists(list_email, recipient_email)): - return False - + raise ValueError(f"Recipient does not exist: {recipient_email}") list_id = self.get_list_id(list_email) - if (list_id == None): - return False - recipient_id = self.get_recipient_id(list_email, recipient_email) - if (recipient_id == None): - return False - self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id]) self.db.commit() - - return True - - def get_recipient_name(self, list_email, recipient_email): - if (not self.check_list_exists(list_email)): - return None - - list_id = self.get_list_id(list_email) - if (not list_id): - return None - - row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() - if (not row): - return None - else: - return row[0] - - def get_recipient_id(self, list_email, recipient_email): - if (not self.check_list_exists(list_email)): - return None - - list_id = self.get_list_id(list_email) - if (not list_id): - return None - - row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() - if (not row): - return None - else: - return row[0] def close(self): self.db.close()