blob: a7f899362ecea2b04869851e41e3145456d4b640 [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Background worker - finds bans and adds 'em, and such and things
import asyncio
import elasticsearch_dsl
import elasticsearch
import typing
import netaddr
import time
import plugins.configuration
import plugins.lists
import datetime
MAX_DB_DAYS = 3 # Only look backwards up to three days. No sense in involving every index in our search.
CLIENT_IP_NAME = "clientip"
TIMESTAMP_NAME = "@timestamp"
async def find_top_clients(
config: plugins.configuration.BlockyConfiguration,
aggtype: typing.Literal["bytes", "requests"] = "requests",
duration: str = "12h",
no_hits: int = 100,
filters: typing.List[str] = [],
) -> typing.List[typing.Tuple[str, int]]:
"""Finds the top clients (IPs) in the database based on the parameters provided.
Searches for the top clients by either traffic volume (bytes) or requests."""
assert aggtype in ["bytes", "requests"], "Only by-bytes or by-requests aggregations are supported"
if isinstance(filters, str):
filters = [filters]
elif filters is None:
filters = []
q = elasticsearch_dsl.Search(using=config.elasticsearch)
q = q.filter("range", **{TIMESTAMP_NAME: {"gte": f"now-{duration}"}})
# Make a list of the past three days' index names:
d = datetime.datetime.utcnow()
t = []
for i in range(0, MAX_DB_DAYS):
index_name = d.strftime(config.index_pattern)
has_index = await config.elasticsearch.indices.exists(index=index_name)
if has_index:
t.append(index_name)
d -= datetime.timedelta(days=1)
threes = ",".join(t)
if not threes:
return []
# Add all search filters
for entry in filters:
if entry:
k, o, v = entry.split(" ", 2) # key, operator, value
xq = q.query # Default is to add as search param
if o.startswith("!"): # exclude as search param?
o = o[1:]
xq = q.exclude
if o == "=":
q = xq("match", **{k: v})
elif o == "~=":
q = xq("regexp", **{k: v})
elif o == "==":
q = xq("term", **{k: v})
else:
raise TypeError(f"Unknown operator {o} in search filter: {entry}")
if aggtype == "requests":
q.aggs.bucket("requests_per_ip", elasticsearch_dsl.A("terms", field=f"{CLIENT_IP_NAME}.keyword", size=no_hits))
elif aggtype == "bytes":
q.aggs.bucket(
"requests_per_ip",
elasticsearch_dsl.A("terms", field=f"{CLIENT_IP_NAME}.keyword", size=no_hits, order={"bytes_sum": "desc"}),
).metric("bytes_sum", "sum", field="bytes")
resp = await config.elasticsearch.search(index=threes, body=q.to_dict(), size=0, timeout="30s")
top_ips = []
if "aggregations" not in resp:
print(f"Could not find aggregated data. Are you sure the index pattern {config.index_pattern} exists?")
return []
for entry in resp["aggregations"]["requests_per_ip"]["buckets"]:
if "bytes_sum" in entry:
top_ips.append(
(
entry["key"],
int(entry["bytes_sum"]["value"]),
)
)
else:
top_ips.append(
(
entry["key"],
int(entry["doc_count"]),
)
)
return top_ips
class BanRule:
def __init__(self, ruledict):
self.description = ruledict["description"]
self.aggtype = ruledict["aggtype"]
self.limit = ruledict["limit"]
self.duration = ruledict["duration"]
self.filters = [x.strip() for x in ruledict["filters"].split("\n") if x.strip()]
async def list_offenders(self, config: plugins.configuration.BlockyConfiguration):
"""Find top clients by $metric, see if they cross the limit..."""
offenders = []
try:
candidates = await find_top_clients(config, aggtype=self.aggtype, duration=self.duration, filters=self.filters)
except (asyncio.exceptions.TimeoutError, elasticsearch.exceptions.ConnectionTimeout):
print("Offender search timed out, retrying later!")
candidates = []
for candidate in candidates:
if candidate[1] >= self.limit:
offenders.append(candidate)
return offenders
async def run(config: plugins.configuration.BlockyConfiguration):
# Search forever, sleep a little in between
while True:
# Find expired rules
now = int(time.time())
all_items = [item for item in config.sqlite.fetch("lists", limit=0)]
for item in all_items:
if item['expires'] == -1:
continue # never expires
if item['expires'] < now:
print(f"Expiring {item['type']} rule for {item['ip']}")
if item['type'] == 'allow':
config.allow_list.remove(item['ip'])
elif item['type'] == 'block':
config.block_list.remove(item['ip'])
# Try adding a temporary whitelist entry to flush on hosts
try:
config.allow_list.add(
ip=item["ip"],
timestamp=now,
expires=now + 600, # Expire this rule in 10 minutes
reason="Temporary allow-listed by BLocky4 to unblock IP due to block expiring",
host=plugins.configuration.DEFAULT_HOST_BLOCK,
force=False
)
except plugins.lists.BlockListException:
pass # If it conflicts, it should already be unblocked, so we don't care.
else:
print("I don't actually know items of type {item['type']}, ignoring...")
all_rules = [item for item in config.sqlite.fetch("rules", limit=0)]
for rule in all_rules:
# print(f"Running rule #{rule['id']}: {rule['description']}...")
my_rule = BanRule(rule)
off = await my_rule.list_offenders(config)
if off:
for offender in off:
off_ip = offender[0]
off_limit = offender[1]
off_ip_na = netaddr.IPAddress(off_ip)
ignore_ip = False
for allowed_ip in config.allow_list:
if (
isinstance(allowed_ip.network, netaddr.IPNetwork)
and off_ip_na in allowed_ip.network
or isinstance(allowed_ip.network, netaddr.IPAddress)
and off_ip_na == allowed_ip.network
):
# print(f"IP {off_ip} is on the allow list, ignoring...")
ignore_ip = True
break
for blocked_ip in config.block_list:
if (
isinstance(blocked_ip.network, netaddr.IPNetwork)
and off_ip_na in blocked_ip.network
or isinstance(blocked_ip.network, netaddr.IPAddress)
and off_ip_na == blocked_ip.network
):
# print(f"IP {off_ip} is already blocked, ignoring...")
ignore_ip = True
break
if not ignore_ip:
off_reason = f"{rule['description']} ({off_limit} >= {rule['limit']})"
print(f"Found new offender, {off_ip}: {off_reason}")
now = int(time.time())
expires = now + config.default_expire_seconds
config.block_list.add(
ip=off_ip,
timestamp=now,
expires=expires,
reason=off_reason,
host=plugins.configuration.DEFAULT_HOST_BLOCK,
)
await asyncio.sleep(15)