listfix/db.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import sqlite3
class DB:
def __init__(self, path):
try:
self.db = sqlite3.connect(path)
except sqlite3.OperationalError:
print(f"Could not open database file {path}")
exit()
self.check_tables()
def check_tables(self):
row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone()
if (not row[0]):
self.db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)")
self.db.commit()
row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone()
if (not row[0]):
self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)")
self.db.commit()
return
def check_list_exists(self, list_email):
lists = self.get_lists()
if (list_email in lists):
return True
else:
return False
def create_list(self, list_email, list_name):
if (self.check_list_exists(list_email)):
return False
self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email])
self.db.commit()
return True
def destroy_list(self, list_email):
list_id = self.get_list_id(list_email)
if (not list_id):
return False
self.db.execute("DELETE FROM lists WHERE id = ?", [list_id])
self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id])
self.db.commit()
return True
def get_lists(self):
lists = []
rows = self.db.execute("SELECT email FROM lists")
if (rows.rowcount == 0):
return False
for row in rows:
lists.append(row[0])
return lists
def get_list_id(self, list_email):
row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone()
if (not row):
return None
else:
return row[0]
def get_list_name(self, list_email):
row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone()
if (not row):
return None
else:
return row[0]
def get_list_recipients(self, list_email):
recipients = []
list_id = self.get_list_id(list_email)
if (not list_id):
return None
rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id])
for row in rows:
recipients.append(row[0])
return recipients
def check_recipient_exists(self, list_email, recipient_email):
recipients = self.get_list_recipients(list_email)
if (recipient_email in recipients):
return True
else:
return False
def create_recipient(self, list_email, recipient_email, recipient_name):
if (not self.check_list_exists(list_email)):
return False
if (self.check_recipient_exists(list_email, recipient_email)):
return False
list_id = self.get_list_id(list_email)
if (list_id == None):
return False
self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email])
self.db.commit()
return True
def destroy_recipient(self, list_email, recipient_email):
if (not self.check_list_exists(list_email)):
return False
if (not self.check_recipient_exists(list_email, recipient_email)):
return False
list_id = self.get_list_id(list_email)
if (list_id == None):
return False
recipient_id = self.get_recipient_id(list_email, recipient_email)
if (recipient_id == None):
return False
self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id])
self.db.commit()
return True
def get_recipient_name(self, list_email, recipient_email):
if (not self.check_list_exists(list_email)):
return None
list_id = self.get_list_id(list_email)
if (not list_id):
return None
row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
if (not row):
return None
else:
return row[0]
def get_recipient_id(self, list_email, recipient_email):
if (not self.check_list_exists(list_email)):
return None
list_id = self.get_list_id(list_email)
if (not list_id):
return None
row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
if (not row):
return None
else:
return row[0]
def close(self):
self.db.close()
|