shorten by using the List object for handling additions
diff --git a/endpoints/block.py b/endpoints/block.py
index 3069796..33c018d 100644
--- a/endpoints/block.py
+++ b/endpoints/block.py
@@ -17,7 +17,7 @@
import ahapi
import plugins.configuration
-import netaddr
+import plugins.lists
import time
""" Generic add-block endpoint for Blocky/4"""
@@ -25,69 +25,18 @@
async def process(state: plugins.configuration.BlockyConfiguration, request, formdata: dict) -> dict:
now = int(time.time())
- force = formdata.get("force", False)
+ force = bool(formdata.get("force", False))
ip = formdata.get("ip")
reason = formdata.get("reason", "no reason specified")
expires = int(formdata.get("expires", 0))
if not expires:
expires = now + state.default_expire_seconds
host = formdata.get("host", plugins.configuration.DEFAULT_HOST_BLOCK)
- ip_as_network = netaddr.IPNetwork(ip)
- # Check if IP address conflicts with an entry on the allow list
- to_remove = []
- for network in state.allow_list:
- if ip_as_network in network.network or network.network in ip_as_network:
- if force:
- to_remove.append(network)
- else:
- return {
- "success": False,
- "status": "failure",
- "message": f"IP entry {ip} conflicts with allow list entry {network.network}. "
- "Please address this or use force=true to override.",
- }
-
- # Check if already blocked
- for network in state.block_list:
- if ip_as_network in network.network or network.network in ip_as_network:
- if force:
- to_remove.append(network)
- else:
- return {
- "success": False,
- "status": "failure",
- "message": f"IP entry {ip} conflicts with block list entry {network.network}. "
- "Please address this or use force=true to override.",
- }
- # If force=true and a conflict was found, remove the conflicting entry
- for entry in to_remove:
- if entry in state.allow_list:
- state.sqlite.delete("allowlist", ip=entry["ip"])
- state.allow_list.remove(entry)
- if entry in state.block_list:
- state.sqlite.delete("blocklist", ip=entry["ip"])
- state.block_list.remove(entry)
-
- # Now add the block
- new_block = plugins.configuration.BlockyBlock(
- ip=ip,
- timestamp=now,
- expires=expires,
- reason=reason,
- host=host,
- )
- state.block_list.append(new_block)
- state.sqlite.insert(
- "blocklist",
- new_block,
- )
-
- # Add to audit log
- state.sqlite.insert(
- "auditlog",
- {"ip": ip, "timestamp": int(time.time()), "event": f"Blocked IP {ip}: {reason}"},
- )
+ try:
+ state.block_list.add(ip=ip, expires=expires, reason=reason, host=host, force=force)
+ except plugins.lists.BlockListException as e:
+ return {"success": False, "status": "failure", "message": str(e)}
# All good!
return {"success": True, "status": "blocked", "message": f"IP {ip} added to block list"}