all repos — listfix @ 0f1ae61f993032a063c32bb705826e9dab4f88a0

Postfix Mailing List Software; Maintained on behalf the of Agency Economy Incorporated NFP.

Create database module for listfix

	modified:   listfix.py
	new file:   mods/listfixdb.py
Brian Barto bartobrian@gmail.com
Mon, 28 Mar 2022 17:01:51 -0400
commit

0f1ae61f993032a063c32bb705826e9dab4f88a0

parent

b14a0aca8471e8b266420fd68b7a0c54f2785a78

2 files changed, 204 insertions(+), 114 deletions(-)

jump to
M listfix.pylistfix.py

@@ -3,7 +3,9 @@

import sys import os import re -import sqlite3 + +sys.path.insert(1, sys.path[0] + "/mods") +from listfixdb import ListfixDB ######################## ## Variable Defs

@@ -23,20 +25,11 @@ # 4 - Trace

debug_level = 3 command = None rval = None -db = None ######################## ## Function Defs ######################## -def check_database_tables(db): - row = db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone() - if (not row[0]): - db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)") - row = db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone() - if (not row[0]): - db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)") - def get_header(lines, header): rval = None

@@ -117,14 +110,13 @@ f = open("/tmp/listfix_log.txt", "a")

f.write(f"[{levels[level]}] {line}\n") f.close() -def command_filter(): +def command_filter(db): recipient = None sender = None sender_name = None content = [] content_filtered = [] - list_id = None list_name = None list_recipients = []

@@ -176,19 +168,14 @@ return True

## Get email list info - row = db.execute("SELECT id, name FROM lists WHERE email = ?", [recipient]).fetchone() - if (not row): + list_name = db.get_list_name(recipient) + if (not list_name): debug_line(1, f"Recipient email list {recipient} not defined in database.") return False - list_id = row[0] - list_name = row[1] - ## Get recipient list - rows = db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id]) - for row in rows: - list_recipients.append(row[0]) + list_recipients = db.get_list_recipients(recipient) if (len(list_recipients) == 0): debug_line(1, f"No recipients defined for email list: {recipient}.")

@@ -215,18 +202,18 @@ debug_line(3, f"List Recipient: {r}")

return True -def command_lists(): +def command_lists(db): + lists = db.get_lists() - rows = db.execute("SELECT name, email FROM lists") - for row in rows: - print(f"{row[1]} ({row[0]})") + for l in lists: + list_name = db.get_list_name(l) + print(f"{l} ({list_name})") return True -def command_dump(): +def command_dump(db): list_email = None - list_id = None ## Check args

@@ -240,24 +227,17 @@ return False

list_email = sys.argv[2] - ## Get email list id + ## Print list recipients - row = db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - print(f"Email list {list_email} not defined in database.") - return False + recipients = db.get_list_recipients(list_email) - list_id = row[0] - - ## Print list recipients - - rows = db.execute("SELECT name, email FROM recipients WHERE list_id = ?", [list_id]) - for row in rows: - print(f"{row[1]} ({row[0]})") + for r in recipients: + recipient_name = db.get_recipient_name(list_email, r) + print(f"{r} ({recipient_name})") return True -def command_create(): +def command_create(db): list_email = None list_name = None

@@ -274,27 +254,18 @@ return False

list_email = sys.argv[2] list_name = sys.argv[3] - - ## Check if exists in db - - row = db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (row): - print(f"Email list {list_email} already defined in database.") - return False - + ## Insert into database - db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email]) - db.commit() + db.create_list(list_email, list_name) print(f"New list ({list_email}) added.") return True -def command_destroy(): +def command_destroy(db): list_email = None - list_id = None ## Check args

@@ -308,29 +279,17 @@ return False

list_email = sys.argv[2] - ## Get id from database + ## Destroy list - row = db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - print(f"Email list {list_email} not defined in database.") - return False - - list_id = row[0] + db.destroy_list(list_email) - ## Delete from database - - db.execute("DELETE FROM lists WHERE id = ?", [list_id]) - db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id]) - db.commit() - - print(f"Email list ({list_email}) deleted.") + print(f"Email list ({list_email}) destroyed.") return True -def command_add(): +def command_add(db): list_email = None - list_id = None recipient_email = None recipient_name = None

@@ -348,37 +307,24 @@ list_email = sys.argv[2]

recipient_email = sys.argv[3] recipient_name = sys.argv[4] - ## Get list id in db - - row = db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - print(f"Email list {list_email} not defined in database.") - return False - - list_id = row[0] - ## Check if recipient already exists in list - row = db.execute("SELECT count(*) FROM recipients WHERE list_id = ? AND email = ?", [list_id, recipient_email]).fetchone() - if (row and row[0] > 0): + if (db.check_recipient_exists(list_email, recipient_email)): print(f"Recipient ({recipient_email}) already exists in list ({list_email})") return False ## Insert into database - db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?, ?)", [list_id, recipient_name, recipient_email]) - db.commit() + db.create_recipient(list_email, recipient_email, recipient_name) print(f"New recipient ({recipient_email}) added to list ({list_email})") return True -def command_remove(): +def command_remove(db): list_email = None - list_id = None recipient_email = None - recipient_id = None ## Check Args

@@ -393,28 +339,9 @@

list_email = sys.argv[2] recipient_email = sys.argv[3] - ## Get list id in db - - row = db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() - if (not row): - print(f"Email list {list_email} not defined in database.") - return False - - list_id = row[0] - - ## Check if recipient does not exist in list + ## Destroy Recipient - row = db.execute("SELECT id FROM recipients WHERE list_id = ? AND email = ?", [list_id, recipient_email]).fetchone() - if (not row): - print(f"Recipient ({recipient_email}) does not exists in list ({list_email})") - return False - - recipient_id = row[0] - - ## Insert into database - - db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id]) - db.commit() + db.destroy_recipient(list_email, recipient_email) print(f"Recipient ({recipient_email}) removed from list ({list_email})")

@@ -428,9 +355,7 @@

## Connect to DB (create DB if needed) and check tables. listfix_dir = os.path.dirname(os.path.realpath(__file__)) -db = sqlite3.connect(listfix_dir + "/listfix.sqlite3") - -check_database_tables(db) +db = ListfixDB(listfix_dir + "/listfix.sqlite3") ## Get command

@@ -442,31 +367,31 @@

## Evaluate command if (command == "filter"): - rval = command_filter() + rval = command_filter(db) if (not rval): raise ValueError(f"Error while executing command_filter()") elif (command == "lists"): - rval = command_lists() + rval = command_lists(db) if (not rval): raise ValueError(f"Error while executing command_lists()") elif (command == "dump"): - rval = command_dump() + rval = command_dump(db) if (not rval): raise ValueError(f"Error while executing command_dump()") elif (command == "create"): - rval = command_create() + rval = command_create(db) if (not rval): raise ValueError(f"Error while executing command_create()") elif (command == "destroy"): - rval = command_destroy() + rval = command_destroy(db) if (not rval): raise ValueError(f"Error while executing command_destroy()") elif (command == "add"): - rval = command_add() + rval = command_add(db) if (not rval): raise ValueError(f"Error while executing command_add()") elif (command == "remove"): - rval = command_remove() + rval = command_remove(db) if (not rval): raise ValueError(f"Error while executing command_remove()") else:
A mods/listfixdb.py

@@ -0,0 +1,165 @@

+import sqlite3 + +class ListfixDB: + + def __init__(self, path): + try: + self.db = sqlite3.connect(path) + except sqlite3.OperationalError: + print(f"Could not open database file {path}") + exit() + self.check_tables() + + def check_tables(self): + row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone() + if (not row[0]): + self.db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)") + self.db.commit() + row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone() + if (not row[0]): + self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)") + self.db.commit() + + return + + def check_list_exists(self, list_email): + + lists = self.get_lists() + + if (list_email in lists): + return True + else: + return False + + def create_list(self, list_email, list_name): + if (self.check_list_exists(list_email)): + return False + + self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email]) + self.db.commit() + + return True + + def destroy_list(self, list_email): + list_id = self.get_list_id(list_email) + if (not list_id): + return False + + self.db.execute("DELETE FROM lists WHERE id = ?", [list_id]) + self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id]) + self.db.commit() + + return True + + def get_lists(self): + lists = [] + + rows = self.db.execute("SELECT email FROM lists") + for row in rows: + lists.append(row[0]) + + return lists + + def get_list_id(self, list_email): + row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone() + if (not row): + return None + else: + return row[0] + + def get_list_name(self, list_email): + row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone() + if (not row): + return None + else: + return row[0] + + def get_list_recipients(self, list_email): + recipients = [] + + list_id = self.get_list_id(list_email) + if (not list_id): + return None + + rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id]) + for row in rows: + recipients.append(row[0]) + + return recipients + + def check_recipient_exists(self, list_email, recipient_email): + recipients = self.get_list_recipients(list_email) + + if (recipient_email in recipients): + return True + else: + return False + + def create_recipient(self, list_email, recipient_email, recipient_name): + + if (not self.check_list_exists(list_email)): + return False + + if (self.check_recipient_exists(list_email, recipient_email)): + return False + + list_id = self.get_list_id(list_email) + if (list_id == None): + return False + + self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email]) + self.db.commit() + + return True + + def destroy_recipient(self, list_email, recipient_email): + + if (not self.check_list_exists(list_email)): + return False + + if (not self.check_recipient_exists(list_email, recipient_email)): + return False + + list_id = self.get_list_id(list_email) + if (list_id == None): + return False + + recipient_id = self.get_recipient_id(list_email, recipient_email) + if (recipient_id == None): + return False + + self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id]) + self.db.commit() + + return True + + def get_recipient_name(self, list_email, recipient_email): + if (not self.check_list_exists(list_email)): + return None + + list_id = self.get_list_id(list_email) + if (not list_id): + return None + + row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() + if (not row): + return None + else: + return row[0] + + def get_recipient_id(self, list_email, recipient_email): + if (not self.check_list_exists(list_email)): + return None + + list_id = self.get_list_id(list_email) + if (not list_id): + return None + + row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone() + if (not row): + return None + else: + return row[0] + + def close(self): + self.db.close()