blob: 1b5ab08ce3d6b04d34318910fd2c3a9e7132ddb4 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Blocky4 client application"""
import netaddr
import time
import iptables
import asyncio
import yaml
import asfpy.syslog
import asfpy.whoami
import aiohttp
import json
MAX_BLOCK_SIZE_IPV4 = (2 ** 16) # Max a /16 block in IPv4 space (32 - 16 == /16)
MAX_BLOCK_SIZE_IPV6 = (2 ** 72) # Max a /56 block in IPv6 space (128 - 72 == /56)
# Redirect print to asfpy's syslog printer, duplicate to stdout
print = asfpy.syslog.Printer(stdout=True, identity="blocky")
async def upload_iptables(config, chains):
# Turn all rules in all chains into dicts, add to a big list
rules_as_dict = []
for chain in chains:
await chain.refresh()
for rule in chain.items:
rules_as_dict.append(rule.to_dict())
# print("Uploading iptables list (%u entries) to Blocky server" % len(rules_as_dict))
try:
js = {"hostname": config["whoami"], "iptables": rules_as_dict}
timeout = aiohttp.ClientTimeout(total=15)
api_url = "%s/upload" % config["api_host"]
async with aiohttp.request("PUT", api_url, json=js, timeout=timeout) as resp:
response = await resp.json()
assert resp.status == 200, f"{resp.status}: {resp.reason}"
# print(response)
except AssertionError as status:
print(f"Server responded with code {status}")
except aiohttp.ClientConnectorError as e:
print("Could not send my iptables list to server at %s - server down or wrong URL?" % api_url)
except aiohttp.ClientTimeout as e:
print("Could not send my iptables list to server at %s - server response timed out" % api_url)
def find_block(chains, ip):
"""Find out if an IP matches any rule in any chain we have"""
for chain in chains:
rule = chain.is_blocked(ip)
if rule:
return rule, chain
return None, None
async def process_changes(config, chains, allow=[], block=[]):
"""Process allows and blocks"""
allow_blocks = []
if allow or block:
print("Processing upstream change-set (%u entries)" % (len(allow) + len(block)))
processed = 0
# First process any new allows
if allow and chains: # If we have an INPUT default chain, refresh before unblock.
await chains[0].refresh()
for entry in allow:
ip = entry["ip"]
if ip:
if "/" in ip:
as_block = netaddr.IPNetwork(ip)
else:
if ":" in ip:
as_block = netaddr.IPNetwork("%s/128" % ip) # IPv6
else:
as_block = netaddr.IPNetwork("%s/32" % ip) # IPv4
allow_blocks.append(as_block)
rule, chain = find_block(chains, as_block)
while rule:
print("Removing %s from block list (found at line %s as %s)" % (ip, rule.line_number, rule.source))
rv = await chain.remove(rule)
if not rv:
print("Could not remove ban for %s from iptables!" % ip)
else:
rule, chain = find_block(chains, as_block)
# Then process blocks
for entry in block:
ip = entry["ip"]
if ip:
processed += 1
if (processed % 500) == 0:
print("Processed %u entries..." % processed)
# Only apply blocks if host is * or our specific name
if entry.get('host', '*') not in [config['whoami'], '*']:
continue
banit = True
if "/" in ip:
as_block = netaddr.IPNetwork(ip)
# We never ban larger than a /8 on ipv4 and /56 on ipv6
if (as_block.version == 4 and as_block.size > MAX_BLOCK_SIZE_IPV4) or (
as_block.version == 6 and as_block.size > MAX_BLOCK_SIZE_IPV6
):
print("%s was requested banned but the net block is too large (%u IPs)" % (as_block, as_block.size))
continue
else:
if ":" in ip:
as_block = netaddr.IPNetwork("%s/128" % ip) # IPv6
else:
as_block = netaddr.IPNetwork("%s/32" % ip) # IPv4
for wblock in allow_blocks:
if as_block in wblock or wblock in as_block:
print("%s was requested banned but %s is allow-listed, ignoring ban" % (as_block, wblock))
banit = False
if banit:
rule, chain = find_block(chains, as_block)
if not rule:
print("Adding %s to block list" % ip)
rv = await chains[0].add(ip, reason=entry["reason"])
if not rv:
print("Could not add ban for %s in iptables!" % ip)
async def loop(config):
uri = config["api_host"]
# Get current list of bans in iptables, upload it to blocky server
chains_to_check = config.get("chains") or ["INPUT"]
chains = []
for chain_name in chains_to_check:
chain = iptables.Chain(chain_name)
await chain.refresh()
chains.append(chain)
last_upload = 0
while True:
async with aiohttp.ClientSession() as session:
try:
rv = await session.get(f"{uri}/all")
assert rv.status == 200, f"API host responded with bad status: {rv.status}"
js = await rv.json()
await process_changes(config, chains, allow=js["allow"], block=js["block"])
# Attach to pubsub and listen for new blocks/allows
async with session.get(config["pubsub_host"], timeout=None) as pubsub_conn:
buffer = b""
async for data, end_of_http_chunk in pubsub_conn.content.iter_chunks():
buffer += data
if end_of_http_chunk:
chunk = buffer.decode("utf-8").strip()
buffer = b""
if chunk:
try:
payload = json.loads(chunk)
if "blocky" in payload.get("pubsub_topics", []):
if "block" in payload:
await process_changes(config, chains, block=[payload["block"]])
elif "allow" in payload:
await process_changes(config, chains, allow=[payload["allow"]])
except json.JSONDecodeError as e:
print(e)
if last_upload + config.get("upload_interval", 300) < time.time():
await upload_iptables(config, chains)
last_upload = time.time()
except Exception as e:
print("[%u] Connection failed (%s: %s), reconnecting in 30 seconds" % (time.time(), type(e), e))
await asyncio.sleep(30)
def main():
# Load YAML
config = yaml.safe_load(open("blocky.yaml").read())
if "whoami" not in config:
config["whoami"] = asfpy.whoami.whoami()
config["api_host"] = config["api_host"].rstrip("/")
# Start async loop
asyncio.get_event_loop().run_until_complete(loop(config))
if __name__ == "__main__":
main()