blob: a7cc8ed1fe92d20cf5ad665e567c7eaf974d4e37 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ahapi
import plugins.configuration
import time
""" Generic add/remove-allow endpoint for Blocky/4"""
async def process(state: plugins.configuration.BlockyConfiguration, request, formdata: dict) -> dict:
now = int(time.time())
force = formdata.get("force", False)
ip = formdata.get("ip")
if request.method in ["PUT", "POST"]:
reason = formdata.get("reason", "no reason specified")
expires = int(formdata.get("expires", -1)) # Different from block. We generally wish to allow forever, so -1 here.
if not expires:
expires = now + state.default_expire_seconds
host = formdata.get("host", plugins.configuration.DEFAULT_HOST_BLOCK)
try:
state.allow_list.add(ip=ip, expires=expires, reason=reason, host=host, force=force)
except plugins.lists.BlockListException as e:
return {"success": False, "status": "failure", "message": str(e)}
# All good!
return {"success": True, "status": "allowed", "message": f"IP {ip} added to allow list"}
elif request.method == "DELETE":
for entry in state.allow_list:
if entry['ip'] == ip:
state.allow_list.remove(entry)
return {"success": True, "status": "removed", "message": f"IP {ip} removed from allow list"}
return {"success": False, "status": "not found", "message": f"IP {ip} does not exist in the allow list"}
def register(config: plugins.configuration.BlockyConfiguration):
return ahapi.endpoint(process)