Create module for email operations. modified: listfix.py new file: mods/listfixemail.py
Brian Barto bartobrian@gmail.com
Tue, 29 Mar 2022 13:34:34 -0400
2 files changed,
193 insertions(+),
109 deletions(-)
M
listfix.py
→
listfix.py
@@ -1,78 +1,18 @@
#!/usr/bin/python3 import sys -import os import re +import os sys.path.insert(1, sys.path[0] + "/mods") from listfixdb import ListfixDB +from listfixemail import ListfixEmail ######################## ## Function Defs ######################## -def get_header(lines, header): - rval = None - - re_header = re.compile("^" + header + ": ", re.IGNORECASE) - - header = header.rstrip() - if (header[-1] == ":"): - header = header[0:-1] - - append_next = False - in_header = True - for line in lines: - if (in_header): - if (re_header_cont.match(line) and append_next): - rval = rval + " " + line.rstrip().lstrip() - continue - - append_next = False - if (re_header.match(line)): - rval = line.rstrip() - append_next = True - elif (re_header_end.match(line)): - in_header = False - else: - break - - return rval - -def strip_headers(lines, exclude): - rval = [] - - append_next = False - in_header = True - for line in lines: - if (in_header): - if (re_header_cont.match(line) and append_next): - rval.append(line) - continue - - if (re_header_end.match(line)): - rval.append(line) - in_header = False - continue - - append_next = False - for ex in exclude: - if (re.match("^" + ex + ": ", line, re.IGNORECASE)): - rval.append(line) - append_next = True - break - else: - rval.append(line) - - return rval - -def send_email(to_email, email_contents): - p = os.popen(f"/usr/sbin/sendmail -G -i {to_email}", "w") - for line in email_contents: - p.write(line) - p.close() - def debug_line(level, line): levels = {@@ -96,20 +36,7 @@ ########################
## Main Program ######################## -re_header_cont = re.compile("^\s+\S+") -re_header_end = re.compile("^\s*$") re_email_arg = re.compile("([^<>\"\s]+)@(\S+\.[^<>\"\s]+)") -re_email_parts = re.compile("(\S+)@(\S+\.\S+)") -re_sender_info = re.compile("^From:\s+\"?([^<>\"]*?)\"?\s*<?(([^<>\"\s]+)@\S+\.[^<>\"\s]+)>?$") -re_auto_reply = re.compile("^Auto-Submitted: (auto-generated|auto-replied)", re.IGNORECASE) - -# 1 - Error -# 2 - Info -# 3 - Debug -# 4 - Trace -debug_level = 3 -command = None -rval = None ## Connect to DB (create DB if needed) and check tables.@@ -118,6 +45,7 @@ db = ListfixDB(listfix_dir + "/listfix.sqlite3")
## Get command +command = None if (len(sys.argv) >= 1): command = sys.argv[1] else:@@ -133,54 +61,38 @@ raise ValueError(f"Error while executing filter.")
else: raise ValueError(f"Error while executing filter.") - recipient = sys.argv[2] + list_email = sys.argv[2] content = [] for line in sys.stdin: content.append(line) - if (len(content) == 0): - raise ValueError(f"Error while executing filter. No content.") - sender = None - sender_name = None - from_line = get_header(content, "From") - if (from_line): - if (re_sender_info.match(from_line)): - results = re_sender_info.search(from_line) - sender_name = results.group(1) if results.group(1) else results.group(3) - sender = results.group(2) - else: - raise ValueError(f"Error while executing filter. No matching email in From header.") - else: - raise ValueError(f"Error while executing filter. No From header.") + email_in = ListfixEmail() + email_in.set_content(content) - auto_sub_line = get_header(content, "Auto-Submitted") - if (auto_sub_line and re_auto_reply.match(auto_sub_line)): + if (email_in.is_auto_reply()): exit() - list_name = db.get_list_name(recipient) - if (not list_name): - raise ValueError(f"Error while executing filter. List {recipient} not defined.") + sender_email = email_in.get_sender_email() + sender_name = email_in.get_sender_name() - list_recipients = db.get_list_recipients(recipient) - if (len(list_recipients) == 0): - raise ValueError(f"Error while executing filter. No recipients defined for list {recipient}.") + list_name = db.get_list_name(list_email) + list_recipients = db.get_list_recipients(list_email) - content_filtered = [] - content_filtered.append(f"From: \"{sender_name} via {list_name}\" <{recipient}>\n") - if (sender not in list_recipients): - content_filtered.append(f"Reply-To: {recipient}, {sender}\n") + email_out = ListfixEmail() + email_out.set_content(email_in.get_content()) + email_out.strip_headers(exclude = ["To", "Cc", "Subject", "Content-[^:]+", "MIME-Version"]) + if (sender_email not in list_recipients): + email_out.add_header_prepend(f"Reply-To: {list_email}, {sender}") else: - content_filtered.append(f"Reply-To: {recipient}\n") - exclude_headers = ["To", "Cc", "Subject", "Content-[^:]+", "MIME-Version"] - content_filtered.extend(strip_headers(content, exclude_headers)) + email_out.add_header_prepend(f"Reply-To: {list_email}") + email_out.add_header_prepend(f"From: \"{sender_name} via {list_name}\" <{list_email}>") - if sender in list_recipients: - list_recipients.remove(sender) + if sender_email in list_recipients: + list_recipients.remove(sender_email) for r in list_recipients: - #send_email(r, content_filtered) - print(f"Sent to {r}") + email_out.send(r) elif (command == "lists"):
A
mods/listfixemail.py
@@ -0,0 +1,172 @@
+import re +import os + +re_header_cont = re.compile("^\s+\S+") +re_header_end = re.compile("^\s*$") +re_email_parts = re.compile("(\S+)@(\S+\.\S+)") +re_sender_info = re.compile("^From:\s+\"?([^<>\"]*?)\"?\s*<?(([^<>\"\s]+)@\S+\.[^<>\"\s]+)>?$") +re_auto_reply = re.compile("^Auto-Submitted: (auto-generated|auto-replied)", re.IGNORECASE) + +class ListfixEmail: + + def __init__(self): + self.content = [] + return + + def set_content(self, content): + if (type(content) is list): + if (len(content) > 0): + self.content = content[:] + else: + return False + else: + return False + + return True + + def get_content(self): + if (len(self.content) == 0): + return False + + return self.content[:] + + + def get_header(self, header): + rval = None + + if (len(self.content) == 0): + return False + + header = header.rstrip() + if (header[-1] == ":"): + header = header[0:-1] + + re_header = re.compile("^" + header + ": ", re.IGNORECASE) + + append_next = False + in_header = True + for line in self.content: + if (in_header): + if (re_header_cont.match(line) and append_next): + rval = rval + " " + line.rstrip().lstrip() + continue + + append_next = False + if (re_header.match(line)): + rval = line.rstrip() + append_next = True + elif (re_header_end.match(line)): + in_header = False + else: + break + + return rval + + def get_sender_email(self): + sender_email = None + + if (len(self.content) == 0): + return False + + from_line = self.get_header("From") + if (from_line): + if (re_sender_info.match(from_line)): + results = re_sender_info.search(from_line) + sender_email = results.group(2) + else: + return False + else: + return False + + return sender_email + + def get_sender_name(self): + sender_name = None + + if (len(self.content) == 0): + return False + + from_line = self.get_header("From") + if (from_line): + if (re_sender_info.match(from_line)): + results = re_sender_info.search(from_line) + sender_name = results.group(1) if results.group(1) else results.group(3) + else: + return False + else: + return False + + return sender_name + + def is_auto_reply(self): + if (len(self.content) == 0): + return False + + auto_sub_line = self.get_header("Auto-Submitted") + if (auto_sub_line and re_auto_reply.match(auto_sub_line)): + return True + + return False + + def strip_headers(self, exclude): + if (len(self.content) == 0): + return False + + stripped_content = [] + + append_next = False + in_header = True + for line in self.content: + if (in_header): + if (re_header_cont.match(line) and append_next): + stripped_content.append(line) + continue + + if (re_header_end.match(line)): + stripped_content.append(line) + in_header = False + continue + + append_next = False + for ex in exclude: + if (re.match("^" + ex + ": ", line, re.IGNORECASE)): + stripped_content.append(line) + append_next = True + break + else: + stripped_content.append(line) + + self.content = stripped_content[:] + + return True + + def add_header(self, header): + if (len(self.content) == 0): + return False + + header = header.rstrip() + "\n" + + self.content.append(header) + + return + + def add_header_prepend(self, header): + if (len(self.content) == 0): + return False + + header = header.rstrip() + "\n" + + self.content.insert(0, header) + + return + + def send(self, recipient): + if (len(self.content) == 0): + return False + + p = os.popen(f"/usr/sbin/sendmail -G -i {recipient}", "w") + for line in self.content: + p.write(line) + p.close() + + return True