Convert sqlite database to json file modified: .gitignore modified: listfix.py modified: listfix/db.py
Brian Barto bartobrian@gmail.com
Thu, 14 Apr 2022 18:02:51 -0400
3 files changed,
53 insertions(+),
80 deletions(-)
M
listfix.py
→
listfix.py
@@ -13,7 +13,7 @@
er = Errors(log, debug=True) er.set_exception_handler() -db = DB(listfix_dir + "/listfix.sqlite3") +db = DB(listfix_dir + "/listfix.json") args = Args(sys.argv) command = args.get_command()@@ -101,4 +101,4 @@ raise ValueError(f"Unknown command: {command}")
## Disconnect from DB -db.close() +db.save()
M
listfix/db.py
→
listfix/db.py
@@ -1,100 +1,72 @@
-import sqlite3 +import json +from os.path import exists class DB: def __init__(self, path): - self.db = sqlite3.connect(path) - self.check_tables() - - def check_tables(self): - row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone() - if (not row[0]): - self.db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)") - self.db.commit() - row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone() - if (not row[0]): - self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)") - self.db.commit() + self.json = {} + self.path = path + if (exists(path)): + with open(path) as f: + self.json = json.load(f) def check_list_exists(self, list_email): - try: - self.get_list_id(list_email) - except ValueError: + if (list_email in self.json.keys()): + return True + else: return False - return True - - def get_lists(self): - lists = [] - rows = self.db.execute("SELECT email FROM lists") - for row in rows: - lists.append(row[0]) - return lists - - def get_list_id(self, list_email): - row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - raise ValueError(f"Email list does not exist: {list_email}") - return row[0] - - def get_list_name(self, list_email): - row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - raise ValueError(f"Email list does not exist: {list_email}") - return row[0] - - def get_list_recipients(self, list_email): - recipients = [] - list_id = self.get_list_id(list_email) - rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id]) - for row in rows: - recipients.append(row[0]) - return recipients def create_list(self, list_email, list_name): if (self.check_list_exists(list_email)): raise ValueError(f"Email list already exists: {list_email}") - self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email]) - self.db.commit() + self.json[list_email] = { 'name': list_name, 'recipients': {} } - def destroy_list(self, list_email): - list_id = self.get_list_id(list_email) - self.db.execute("DELETE FROM lists WHERE id = ?", [list_id]) - self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id]) - self.db.commit() + def get_lists(self): + return list(self.json.keys()) + + def get_list_name(self, list_email): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + return self.json[list_email]['name'] def check_recipient_exists(self, list_email, recipient_email): - try: - self.get_recipient_id(list_email, recipient_email) - except ValueError: + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + if (recipient_email in self.json[list_email]['recipients'].keys()): + return True + else: return False - return True - - def get_recipient_id(self, list_email, recipient_email): - list_id = self.get_list_id(list_email) - row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() - if (not row): - raise ValueError(f"Recipient does not exist: {recipient_email}") - return row[0] - - def get_recipient_name(self, list_email, recipient_email): - list_id = self.get_list_id(list_email) - row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() - if (not row): - raise ValueError(f"Recipient does not exist: {recipient_email}") - return row[0] def create_recipient(self, list_email, recipient_email, recipient_name): if (self.check_recipient_exists(list_email, recipient_email)): raise ValueError(f"Recipient already exists: {recipient_email}") - list_id = self.get_list_id(list_email) - self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email]) - self.db.commit() + self.json[list_email]['recipients'][recipient_email] = { 'name': recipient_name } + + def get_list_recipients(self, list_email): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + return list(self.json[list_email]['recipients'].keys()) + + def get_recipient_name(self, list_email, recipient_email): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + if (not self.check_recipient_exists(list_email, recipient_email)): + raise ValueError(f"Recipient does not exist: {recipient_email}") + return self.json[list_email]['recipients'][recipient_email]['name'] def destroy_recipient(self, list_email, recipient_email): - list_id = self.get_list_id(list_email) - recipient_id = self.get_recipient_id(list_email, recipient_email) - self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id]) - self.db.commit() + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + if (not self.check_recipient_exists(list_email, recipient_email)): + raise ValueError(f"Recipient does not exist: {recipient_email}") + del self.json[list_email]['recipients'][recipient_email] - def close(self): - self.db.close()+ def destroy_list(self, list_email): + if (not self.check_list_exists(list_email)): + raise ValueError(f"Email list does not exist: {list_email}") + del self.json[list_email] + + def save(self): + f = open(self.path, "w") + json.dump(self.json, f, indent=4) + f.close()