strip out list handling, call plugins.lists.List instead
also seed a newly created DB with the default list, so we only have to munge the List object once.
diff --git a/plugins/configuration.py b/plugins/configuration.py
index 66b5f46..8f04b02 100644
--- a/plugins/configuration.py
+++ b/plugins/configuration.py
@@ -21,7 +21,7 @@
import asfpy.sqlite
import elasticsearch
import plugins.db_create
-import netaddr
+import plugins.lists
DEFAULT_EXPIRE = 86400 * 30 * 4 # Default expiry of auto-bans = 4 months
@@ -36,30 +36,6 @@
]
-class BlockyBlock(dict):
- def __init__(self, ip: str, timestamp: int, expires: int, reason: str = None, host: str = "*"):
- dict.__init__(self,
- ip=ip,
- timestamp=timestamp,
- expires=expires,
- reason=reason,
- host=host
- )
- self.network = netaddr.IPNetwork(ip)
-
-
-class BlockyAllow(dict):
- def __init__(self, ip: str, timestamp: int, expires: int, reason: str = None, host: str = "*"):
- dict.__init__(self,
- ip=ip,
- timestamp=timestamp,
- expires=expires,
- reason=reason,
- host=host
- )
- self.network = netaddr.IPNetwork(ip)
-
-
class BlockyConfiguration:
def __init__(self, yml):
self.database_filepath = yml.get("database", "blocky.sqlite")
@@ -68,47 +44,33 @@
self.index_pattern = yml.get("index_pattern", DEFAULT_INDEX_PATTERN)
self.elasticsearch_url = yml.get("elasticsearch_url")
self.elasticsearch = elasticsearch.AsyncElasticsearch(hosts=[self.elasticsearch_url])
- self.block_list = []
- self.allow_list = []
self.http_ip = yml.get('bind_ip', '127.0.0.1')
self.http_port = int(yml.get('bind_port', 8080))
# Create table if not there yet
+ new_db = False
if not self.sqlite.table_exists("rules"):
print(f"Database file {self.database_filepath} is empty, initializing tables")
self.sqlite.run(plugins.db_create.CREATE_DB_RULES)
- self.sqlite.run(plugins.db_create.CREATE_DB_BANS)
- self.sqlite.run(plugins.db_create.CREATE_DB_ALLOW)
+ self.sqlite.run(plugins.db_create.CREATE_DB_LISTS)
self.sqlite.run(plugins.db_create.CREATE_DB_AUDIT)
print(f"Database file {self.database_filepath} has been successfully initialized")
+ new_db = True
- # Fetch existing blocks and allows
- for entry in self.sqlite.fetch("blocklist", limit=0):
- block = BlockyBlock(
- ip=entry["ip"],
- timestamp=entry["timestamp"],
- expires=entry["expires"],
- reason=entry["reason"],
- host=entry.get("host", "*")
- )
- self.block_list.append(block)
- for entry in DEFAULT_ALLOW_LIST:
- allow = BlockyAllow(
- ip=entry,
- timestamp=0,
- expires=-1,
- reason="Default allowed ranges (local network)",
- host="*",
- )
- self.allow_list.append(allow)
- for entry in self.sqlite.fetch("allowlist", limit=0):
- allow = BlockyAllow(
- ip=entry["ip"],
- expires=entry["expires"],
- reason=entry["reason"],
- host=entry.get("host", "*")
- )
- self.allow_list.append(allow)
+ # Init and fetch existing blocks and allows
+ self.block_list = plugins.lists.List(self, 'block')
+ self.allow_list = plugins.lists.List(self, 'allow')
+
+ # Seed new DB with default allows if needed
+ if new_db:
+ for entry in DEFAULT_ALLOW_LIST:
+ self.allow_list.add(
+ ip=entry,
+ timestamp=0,
+ expires=-1,
+ reason="Default allowed ranges (local network)",
+ host="*",
+ )
async def test_es(self):
i = await self.elasticsearch.info()