listfix.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
#!/usr/bin/python3
import sys
import os
from listfix import Args, DB, Email, Errors, Log, Test
## Setup
listfix_dir = os.path.dirname(os.path.realpath(__file__))
log = Log(listfix_dir + "/listfix.log")
er = Errors(log, debug=False)
er.set_exception_handler()
db = DB(listfix_dir + "/listfix.json")
args = Args(sys.argv)
command = args.get_command()
## Evaluate command
if (command == "filter"):
list_email = args.get_list_email()
list_name = db.get_list_name(list_email)
list_recipients = db.get_list_recipients(list_email)
email_in = Email(list(sys.stdin))
if (email_in.check_auto_reply()):
exit()
sender_email = email_in.get_sender_email()
sender_name = email_in.get_sender_name()
sender_email = sender_email.lower()
if (sender_email not in list_recipients):
log.info(f"Email dropped: sender {sender_email} not a member of list {list_email}.")
exit()
log.info(f"Email from: {sender_email}")
log.info(f"Email list: {list_email}")
email_out = Email(email_in.get_content())
email_out.strip_headers(exclude = ["To", "Cc", "Subject", "Content-[^:]+", "MIME-Version", "Date", "Message-ID", "References"])
email_out.add_header_prepend(f"Reply-To: {list_email}")
email_out.add_header_prepend(f"List-Id: {list_email}")
email_out.add_header_prepend(f"From: \"{sender_name} via {list_name}\" <{list_email}>")
list_recipients.remove(sender_email)
for r in list_recipients:
log.info(f"Recipient: {r}")
email_out.send(r)
elif (command == "lists"):
lists = db.get_lists()
if (len(lists) == 0):
print("No lists defined.")
else:
for l in lists:
list_name = db.get_list_name(l)
print(f"{l} ({list_name})")
elif (command == "dump"):
list_email = args.get_list_email()
recipients = db.get_list_recipients(list_email)
if (len(recipients) == 0):
print(f"No recipients defined for list {list_email}")
else:
for r in recipients:
recipient_name = db.get_recipient_name(list_email, r)
print(f"{r} ({recipient_name})")
elif (command == "create"):
list_email = args.get_list_email()
list_name = args.get_list_name()
db.create_list(list_email, list_name)
print(f"New list ({list_email}) added.")
elif (command == "destroy"):
list_email = args.get_list_email()
db.destroy_list(list_email)
print(f"Email list ({list_email}) destroyed.")
elif (command == "add"):
list_email = args.get_list_email()
recipient_email = args.get_recipient_email()
recipient_name = args.get_recipient_name()
db.create_recipient(list_email, recipient_email, recipient_name)
print(f"New recipient ({recipient_email}) added to list ({list_email})")
elif (command == "remove"):
list_email = args.get_list_email()
recipient_email = args.get_recipient_email()
db.destroy_recipient(list_email, recipient_email)
print(f"Recipient ({recipient_email}) removed from list ({list_email})")
elif (command == "lower"):
db.lower_emails()
print(f"Completed.")
elif (command == "test"):
test = Test()
test.run_test()
else:
raise ValueError(f"Unknown command: {command}")
## Save the state of the DB
db.save()
|