General code cleanup modified: listfix_filter.py
Brian Barto bartobrian@gmail.com
Fri, 11 Mar 2022 15:55:45 -0500
1 files changed,
145 insertions(+),
93 deletions(-)
jump to
M
listfix_filter.py
→
listfix_filter.py
@@ -4,111 +4,163 @@ import sys
import os import re -sender = "None" -recipient = "None" -if (len(sys.argv) > 2): +######################## +## Function Defs +######################## + +def args_ok(): + if (len(sys.argv) > 2): + if (not re_email.match(sys.argv[1]) or not re_email.match(sys.argv[2])): + return False + else: + return True + else: + return False + +def get_header(lines, header): + rval = None + + re_header = re.compile("^" + header + ": ") + + header = header.rstrip() + if (header[-1] == ":"): + header = header[0:-1] + + append_next = False + in_header = True + for line in lines: + if (in_header): + if (re_header_cont.match(line) and append_next): + rval = rval + " " + line.rstrip().lstrip() + continue + + append_next = False + if (re_header.match(line)): + rval = line.rstrip() + append_next = True + elif (re_header_end.match(line)): + in_header = False + else: + break + + return rval + +def strip_headers(lines, exclude): + rval = [] + + append_next = False + in_header = True + for line in lines: + if (in_header): + if (re_header_cont.match(line) and append_next): + rval.append(line) + continue + + if (re_header_end.match(line)): + rval.append(line) + in_header = False + continue + + append_next = False + for ex in exclude: + if (re.match("^" + ex + ": ", line)): + rval.append(line) + append_next = True + break + else: + rval.append(line) + + return rval + +def send_email(to_email, email_contents): + p = os.popen(f"/usr/sbin/sendmail -G -i {to_email}", "w") + for line in email_contents: + p.write(line) + p.close() + + +######################## +## Main Program +######################## + +re_header_cont = re.compile("^\s+\S+") +re_header_end = re.compile("^\s*$") +re_email = re.compile("([^<>\"\s]+)@(\S+\.[^<>\"\s]+)") +re_sender_name = re.compile("^From:\s+\"?([^<>\"]*)\"?\s*<?(\S*)@\S+\.\S+>?$") +re_auto_reply = re.compile("^Auto-Submitted: (auto-generated|auto-replied)", re.IGNORECASE) + +sender = None +recipient = None +to_line = None +from_line = None +sender_name = None +list_email = None +list_name = None +list_domain = None +email = [] +email_filtered = [] + +## Get/check args + +if (args_ok()): sender = sys.argv[1] recipient = sys.argv[2] +else: + raise ValueError('Missing or Invalid Arguments') -#print(f"Sender: {sender}") -#print(f"Recipient: {recipient}") -#f = open("/tmp/filter.txt", "a") -#f.write(f"Sender: {sender}\n") -#f.write(f"Recipient: {recipient}\n") -#f.close() -orig_to = "" -orig_from = "" -list_email = "" -list_name = "" -list_domain = "" +## Get email content -stdin= [] for line in sys.stdin: - if (list_email == "" and line[0:4] == "To: "): - orig_to = line[4:].rstrip() - list_email = re.search("\S+@\S+\.\S+", orig_to).group() - if (list_email[-1] == ","): - list_email = list_email[0:-1] - if (list_email[0] == "\"" and list_email[-1] == "\""): - list_email = list_email[1:-1] - elif (list_email[0] == "<" and list_email[-1] == ">"): - list_email = list_email[1:-1] - list_name = re.search("^[^@]+", list_email).group() - list_domain = re.search("^[^@]+@(.+)$", list_email).group(1) - stdin.append(line) + email.append(line) +if (len(email) == 0): + raise ValueError('Missing Piped Data') -sender_name = "" -for line in stdin: - if (sender_name == "" and line[0:6] == "From: "): - orig_from = line[6:].rstrip() - if (re.search("^\"[^\"]+\"", orig_from)): - sender_name = re.search("^\"([^\"]+)\"", orig_from).group(1) - elif (re.search("^[^<]+ <", orig_from)): - sender_name = re.search("^([^<]+) <", orig_from).group(1) - else: - sender_name = re.search("(\S+)@\S+\.\S+", orig_from).group(1) - if (sender_name[0] == "\"" or sender_name[0] == "<"): - sender_name = sender_name[1:] +## Check if this is an auto-reply -for line in stdin: - if (re.search("^Auto-Submitted: auto-generated", line, re.IGNORECASE)): - exit() - if (re.search("^Auto-Submitted: auto-replied", line, re.IGNORECASE)): - exit() - if (re.search("^\s*$", line)): - break +auto_sub_line = get_header(email, "Auto-Submitted") +if (auto_sub_line and re_auto_reply.match(auto_sub_line)): + exit() + +## Get To: and From: lines + +to_line = get_header(email, "To") +from_line = get_header(email, "From") +if (not to_line or not from_line): + raise ValueError('Can not find To or From value in headers.') -## We add the filter skip header after the from line had been changed -## and reinjected. This is necessary to get the dkim milter to run -## for the new "From" header. Otherwise it doesn't run. -contents = [] -if (re.search(list_email, orig_from)): - contents.append(f"List-Skip-Filter: yes\n") - for line in stdin: - contents.append(line) +## Get list info + +if (re_email.search(to_line)): + results = re_email.search(to_line) + list_email = results.group(0) + list_name = results.group(1) + list_domain = results.group(2) +else: + raise ValueError('Can not determine email list.') + +## Get Sender Name + +if (re_sender_name.match(from_line)): + results = re_sender_name.search(from_line) + sender_name = results.group(1) if results.group(1) else results.group(2) else: - contents.append(f"To: {list_email}\n") - contents.append(f"From: \"{sender_name} via {list_name}\" <{list_email}>\n") - contents.append(f"Reply-To: {list_email}, {sender}\n") + raise ValueError('Can Not Determine Sender Name') - in_header = 1 - included = 0 - for line in stdin: - if (in_header == 1): - if (re.search("^\s+\S+", line) and included == 1): - contents.append(line) - continue +## If this is already filtered, add skip header and resubmit + +if (re.search(list_email, from_line)): + email.insert(0, "List-Skip-Filter: yes\n") + send_email(list_email, email) + exit() - included = 0 - if (re.search("^Subject: ", line)): - contents.append(line) - included = 1 - elif (re.search("^Content-", line)): - contents.append(line) - included = 1 - elif (re.search("^\s*$", line)): - contents.append(line) - in_header = 0 - else: - contents.append(line) +## Costruct Filtered Email - #found_from = 0 - #found_to = 0 - #for line in stdin: - # if (found_from == 0 and line[0:6] == "From: "): - # line = f"From: \"{sender_name} via {list_name}\" <{list_email}>\n" - # contents.append(f"Reply-To: {sender}, {list_email}\n") - # found_from = 1 - # elif (found_to == 0 and line[0:4] == "To: "): - # line = f"To: {list_email}\n" - # found_to = 1 - # contents.append(line) +email_filtered.append(f"To: {list_email}\n") +email_filtered.append(f"From: \"{sender_name} via {list_name}\" <{list_email}>\n") +email_filtered.append(f"Reply-To: {list_email}, {sender}\n") -#for line in contents: -# print(line, end = "") +exclude_headers = ["Subject", "Content-[^:]+", "MIME-Version"] +email_filtered.extend(strip_headers(email, exclude_headers)) -p = os.popen(f"/usr/sbin/sendmail -G -i {list_email}", "w") -for line in contents: - p.write(line) -p.close() +send_email(list_email, email_filtered)