all repos — listfix @ 0f9c6214979820ecc97b282bd766d5fce2573646

Postfix Mailing List Software; Maintained on behalf the of Agency Economy Incorporated NFP.

listfix/db.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
import sqlite3

class DB:

    def __init__(self, path):
        self.db = sqlite3.connect(path)
        self.check_tables()

    def check_tables(self):
        row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone()
        if (not row[0]):
            self.db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)")
            self.db.commit()
        row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone()
        if (not row[0]):
            self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)")
            self.db.commit()

    def check_list_exists(self, list_email):
        try:
            self.get_list_id(list_email)
        except ValueError:
            return False
        return True

    def get_lists(self):
        lists = []
        rows = self.db.execute("SELECT email FROM lists")
        for row in rows:
            lists.append(row[0])
        return lists

    def get_list_id(self, list_email):
        row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone()
        if (not row):
            raise ValueError(f"Email list does not exist: {list_email}")
        return row[0]

    def get_list_name(self, list_email):
        row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone()
        if (not row):
            raise ValueError(f"Email list does not exist: {list_email}")
        return row[0]
    
    def get_list_recipients(self, list_email):
        recipients = []
        list_id = self.get_list_id(list_email)
        rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id])
        for row in rows:
            recipients.append(row[0])
        return recipients

    def create_list(self, list_email, list_name):
        if (self.check_list_exists(list_email)):
            raise ValueError(f"Email list already exists: {list_email}")
        self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email])
        self.db.commit()

    def destroy_list(self, list_email):
        list_id = self.get_list_id(list_email)
        self.db.execute("DELETE FROM lists WHERE id = ?", [list_id])
        self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id])
        self.db.commit()

    def check_recipient_exists(self, list_email, recipient_email):
        try:
            self.get_recipient_id(list_email, recipient_email)
        except ValueError:
            return False
        return True

    def get_recipient_id(self, list_email, recipient_email):
        list_id = self.get_list_id(list_email)
        row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
        if (not row):
            raise ValueError(f"Recipient does not exist: {recipient_email}")
        return row[0]

    def get_recipient_name(self, list_email, recipient_email):
        list_id = self.get_list_id(list_email)
        row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
        if (not row):
            raise ValueError(f"Recipient does not exist: {recipient_email}")
        return row[0]

    def create_recipient(self, list_email, recipient_email, recipient_name):
        if (self.check_recipient_exists(list_email, recipient_email)):
            raise ValueError(f"Recipient already exists: {recipient_email}")
        list_id = self.get_list_id(list_email)
        self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email])
        self.db.commit()

    def destroy_recipient(self, list_email, recipient_email):
        list_id = self.get_list_id(list_email)
        recipient_id = self.get_recipient_id(list_email, recipient_email)
        self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id])
        self.db.commit()

    def close(self):
        self.db.close()