listfix.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
#!/usr/bin/python3
import sys
import os
from listfix import Args, DB, Email, error_handler
########################
## Function Defs
########################
def debug_line(level, line):
levels = {
1 : "Error",
2 : "Info",
3 : "Debug",
4 : "Trace"
}
if (level not in levels.keys()):
return
line = line.rstrip()
if (level <= debug_level):
f = open("/tmp/listfix_log.txt", "a")
f.write(f"[{levels[level]}] {line}\n")
f.close()
########################
## Main Program
########################
sys.excepthook = error_handler
## Connect to DB (create DB if needed) and check tables.
listfix_dir = os.path.dirname(os.path.realpath(__file__))
db = DB(listfix_dir + "/listfix.sqlite3")
## Get args
args = Args(sys.argv)
command = args.get_command()
## Evaluate command
if (command == "filter"):
list_email = args.get_list_email()
list_name = db.get_list_name(list_email)
list_recipients = db.get_list_recipients(list_email)
email_in = Email(list(sys.stdin))
if (email_in.check_auto_reply()):
exit()
sender_email = email_in.get_sender_email()
sender_name = email_in.get_sender_name()
email_out = Email(email_in.get_content())
email_out.strip_headers(exclude = ["To", "Cc", "Subject", "Content-[^:]+", "MIME-Version"])
if (sender_email not in list_recipients):
email_out.add_header_prepend(f"Reply-To: {list_email}, {sender_email}")
else:
email_out.add_header_prepend(f"Reply-To: {list_email}")
email_out.add_header_prepend(f"From: \"{sender_name} via {list_name}\" <{list_email}>")
if sender_email in list_recipients:
list_recipients.remove(sender_email)
for r in list_recipients:
email_out.send(r)
elif (command == "lists"):
lists = db.get_lists()
if (len(lists) == 0):
print("No lists defined.")
else:
for l in lists:
list_name = db.get_list_name(l)
print(f"{l} ({list_name})")
elif (command == "dump"):
list_email = args.get_list_email()
recipients = db.get_list_recipients(list_email)
if (len(recipients) == 0):
print(f"No recipients defined for list {list_email}")
else:
for r in recipients:
recipient_name = db.get_recipient_name(list_email, r)
print(f"{r} ({recipient_name})")
elif (command == "create"):
list_email = args.get_list_email()
list_name = args.get_list_name()
db.create_list(list_email, list_name)
print(f"New list ({list_email}) added.")
elif (command == "destroy"):
list_email = args.get_list_email()
db.destroy_list(list_email)
print(f"Email list ({list_email}) destroyed.")
elif (command == "add"):
list_email = args.get_list_email()
recipient_email = args.get_recipient_email()
recipient_name = args.get_recipient_name()
db.create_recipient(list_email, recipient_email, recipient_name)
print(f"New recipient ({recipient_email}) added to list ({list_email})")
elif (command == "remove"):
list_email = args.get_list_email()
recipient_email = args.get_recipient_email()
db.destroy_recipient(list_email, recipient_email)
print(f"Recipient ({recipient_email}) removed from list ({list_email})")
else:
raise ValueError(f"Unknown command: {command}")
## Disconnect from DB
db.close()
|