all repos — listfix @ 80140a14ba732d5196dab4cd988b862f8cad3980

Postfix Mailing List Software; Maintained on behalf the of Agency Economy Incorporated NFP.

listfix/db.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
import sqlite3

class DB:

    def __init__(self, path):
        try:
            self.db = sqlite3.connect(path)
        except sqlite3.OperationalError:
            print(f"Could not open database file {path}")
            exit()
        self.check_tables()

    def check_tables(self):
        row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'lists'").fetchone()
        if (not row[0]):
            self.db.execute("create table lists(id INTEGER primary key autoincrement, name text, email text)")
            self.db.commit()
        row = self.db.execute("select count(*) from sqlite_master where type = 'table' and name = 'recipients'").fetchone()
        if (not row[0]):
            self.db.execute("create table recipients(id INTEGER primary key autoincrement, list_id int, name text, email text)")
            self.db.commit()

        return

    def check_list_exists(self, list_email):

        lists = self.get_lists()

        if (list_email in lists):
            return True
        else:
            return False

    def create_list(self, list_email, list_name):
        if (self.check_list_exists(list_email)):
            return False

        self.db.execute("INSERT INTO lists (name, email) VALUES (?,?)", [list_name, list_email])
        self.db.commit()

        return True

    def destroy_list(self, list_email):
        list_id = self.get_list_id(list_email)
        if (not list_id):
            return False

        self.db.execute("DELETE FROM lists WHERE id = ?", [list_id])
        self.db.execute("DELETE FROM recipients WHERE list_id = ?", [list_id])
        self.db.commit()

        return True

    def get_lists(self):
        lists = []

        rows = self.db.execute("SELECT email FROM lists")

        if (rows.rowcount == 0):
            return False

        for row in rows:
            lists.append(row[0])

        return lists

    def get_list_id(self, list_email):
        row = self.db.execute("SELECT id FROM lists WHERE email = ?", [list_email]).fetchone()
        if (not row):
            return None
        else:
            return row[0]

    def get_list_name(self, list_email):
        row = self.db.execute("SELECT name FROM lists WHERE email = ?", [list_email]).fetchone()
        if (not row):
            return None
        else:
            return row[0]
    
    def get_list_recipients(self, list_email):
        recipients = []
        
        list_id = self.get_list_id(list_email)
        if (not list_id):
            return None

        rows = self.db.execute("SELECT email FROM recipients WHERE list_id = ?", [list_id])
        for row in rows:
            recipients.append(row[0])

        return recipients

    def check_recipient_exists(self, list_email, recipient_email):
        recipients = self.get_list_recipients(list_email)

        if (recipient_email in recipients):
            return True
        else:
            return False

    def create_recipient(self, list_email, recipient_email, recipient_name):

        if (not self.check_list_exists(list_email)):
            return False

        if (self.check_recipient_exists(list_email, recipient_email)):
            return False

        list_id = self.get_list_id(list_email)
        if (list_id == None):
            return False

        self.db.execute("INSERT INTO recipients (list_id, name, email) VALUES (?,?,?)", [list_id, recipient_name, recipient_email])
        self.db.commit()

        return True

    def destroy_recipient(self, list_email, recipient_email):

        if (not self.check_list_exists(list_email)):
            return False

        if (not self.check_recipient_exists(list_email, recipient_email)):
            return False

        list_id = self.get_list_id(list_email)
        if (list_id == None):
            return False

        recipient_id = self.get_recipient_id(list_email, recipient_email)
        if (recipient_id == None):
            return False

        self.db.execute("DELETE FROM recipients WHERE id = ?", [recipient_id])
        self.db.commit()

        return True

    def get_recipient_name(self, list_email, recipient_email):
        if (not self.check_list_exists(list_email)):
            return None

        list_id = self.get_list_id(list_email)
        if (not list_id):
            return None

        row = self.db.execute("SELECT name FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
        if (not row):
            return None
        else:
            return row[0]

    def get_recipient_id(self, list_email, recipient_email):
        if (not self.check_list_exists(list_email)):
            return None

        list_id = self.get_list_id(list_email)
        if (not list_id):
            return None

        row = self.db.execute("SELECT id FROM recipients WHERE list_id = ? and email = ?", [list_id, recipient_email]).fetchone()
        if (not row):
            return None
        else:
            return row[0]

    def close(self):
        self.db.close()