all repos — listfix @ 47294f7859dc239ee9a911059da2810f0ddebaa2

Postfix Mailing List Software; Maintained on behalf the of Agency Economy Incorporated NFP.

listfix.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
#!/usr/bin/python3

import sys
import os
from listfix import Args, DB, Email, Errors, Log, Test

## Setup

listfix_dir = os.path.dirname(os.path.realpath(__file__))

log = Log(listfix_dir + "/listfix.log")

er = Errors(log, debug=True)
er.set_exception_handler()

db = DB(listfix_dir + "/listfix.json")

args = Args(sys.argv)
command = args.get_command()

## Evaluate command

if (command == "filter"):
    list_email = args.get_list_email()
    list_name = db.get_list_name(list_email)
    list_recipients = db.get_list_recipients(list_email)

    email_in = Email(list(sys.stdin))
    if (email_in.check_auto_reply()):
        exit()
    sender_email = email_in.get_sender_email()
    sender_name = email_in.get_sender_name()

    log.info(f"Email from: {sender_email}")
    log.info(f"Email list: {list_email}")

    email_out = Email(email_in.get_content())
    email_out.strip_headers(exclude = ["To", "Cc", "Subject", "Content-[^:]+", "MIME-Version"])
    if (sender_email not in list_recipients):
        email_out.add_header_prepend(f"Reply-To: {list_email}, {sender_email}")
    else:
        email_out.add_header_prepend(f"Reply-To: {list_email}")
    email_out.add_header_prepend(f"From: \"{sender_name} via {list_name}\" <{list_email}>")

    if sender_email in list_recipients:
        list_recipients.remove(sender_email)

    for r in list_recipients:
        log.info(f"Recipient: {r}")
        email_out.send(r)

elif (command == "lists"):
    lists = db.get_lists()
    if (len(lists) == 0):
        print("No lists defined.")
    else:
        for l in lists:
            list_name = db.get_list_name(l)
            print(f"{l} ({list_name})")

elif (command == "dump"):
    list_email = args.get_list_email()
    recipients = db.get_list_recipients(list_email)
    if (len(recipients) == 0):
        print(f"No recipients defined for list {list_email}")
    else:
        for r in recipients:
            recipient_name = db.get_recipient_name(list_email, r)
            print(f"{r} ({recipient_name})")

elif (command == "create"):
    list_email = args.get_list_email()
    list_name = args.get_list_name()
    db.create_list(list_email, list_name)
    print(f"New list ({list_email}) added.")

elif (command == "destroy"):
    list_email = args.get_list_email()
    db.destroy_list(list_email)
    print(f"Email list ({list_email}) destroyed.")

elif (command == "add"):
    list_email = args.get_list_email()
    recipient_email = args.get_recipient_email()
    recipient_name = args.get_recipient_name()
    db.create_recipient(list_email, recipient_email, recipient_name)
    print(f"New recipient ({recipient_email}) added to list ({list_email})")

elif (command == "remove"):
    list_email = args.get_list_email()
    recipient_email = args.get_recipient_email()
    db.destroy_recipient(list_email, recipient_email)
    print(f"Recipient ({recipient_email}) removed from list ({list_email})")

elif (command == "test"):
    test = Test()
    test.run_test()

else:
    raise ValueError(f"Unknown command: {command}")

## Disconnect from DB

db.save()