blob: dd2bc37e18d4575d4e46873f62cab30f8772dc3c [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""ASF Infrastructure Reporting Dashboard - Quart Middleware"""
if not __debug__:
raise RuntimeError("This code requires assert statements to be enabled")
import sys
import traceback
import typing
import uuid
import quart
from . import config
import werkzeug.routing
import asyncio
import functools
import aiohttp
import time
import json
async def consume_body():
"""Consumes the request body, punting it to dev-null. This is required for httpd to not throw 502 at error"""
# See: https://bz.apache.org/bugzilla/show_bug.cgi?id=55433
async for _data in quart.request.body:
pass
def glued(func: typing.Callable) -> typing.Callable:
"""Middleware that collects all form data (except file uploads!) and joins as one dict"""
async def call(**args):
form_data = dict()
form_data.update(quart.request.args.to_dict())
xform = await quart.request.form
# Pre-parse check for form data size
if quart.request.content_type and any(
x in quart.request.content_type
for x in (
"multipart/form-data",
"application/x-www-form-urlencoded",
"application/x-url-encoded",
)
):
if quart.request.content_length > config.server.max_form_size:
await consume_body()
return quart.Response(
status=413,
response=f"Request content length ({quart.request.content_length} bytes) is larger than what is permitted for form data ({config.server.max_form_size} bytes)!",
)
if xform:
form_data.update(xform.to_dict())
if quart.request.is_json:
xjson = await quart.request.json
form_data.update(xjson)
try:
resp = await func(form_data, **args)
assert resp, "No response was provided by the underlying endpoint!"
except Exception: # Catch and spit out errors
exc_type, exc_value, exc_traceback = sys.exc_info()
err = "\n".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
headers = {
"Server": "ASF Infra Reporting Dashboard",
"Content-Type": "text/plain",
}
# By default, we print the traceback to the user, for easy debugging.
if config.server.error_reporting == "json":
error_text = "API error occurred: \n" + err
return quart.Response(headers=headers, status=500, response=error_text)
# If client traceback is disabled, we print it to stderr instead, but leave an
# error ID for the client to report back to the admin. Every line of the traceback
# will have this error ID at the beginning of the line, for easy grepping.
else:
# We only need a short ID here, let's pick 18 chars.
eid = str(uuid.uuid4())[:18]
sys.stderr.write("API Endpoint %s got into trouble (%s): \n" % (quart.request.path, eid))
for line in err.split("\n"):
sys.stderr.write("%s: %s\n" % (eid, line))
return quart.Response(
headers=headers,
status=500,
response="API error occurred. The application journal will have information. Error ID: %s" % eid,
)
# If an error is thrown before the request body has been consumed, eat it quietly.
if not quart.request.body._complete.is_set():
await consume_body()
return resp
# Quart will, if no rule name is specified, default to calling the rule "call" here,
# which leads to carps about duplicate rule definitions. So, given the fact that call()
# is dynamically made from within this function, we simply adjust its internal name to
# refer to the calling module and function, thus providing Quart with a much better
# name for the rule, which will also aid in debugging.
call.__name__ = func.__module__ + "." + func.__name__
return call
def auth_failed():
"""Returns the appropriate authorization failure response, depending on auth mechanism supplied."""
if "x-artifacts-webui" not in quart.request.headers: # Not done via Web UI, standard 401 response
headers = {"WWW-Authenticate": 'Basic realm="infra-reports-ec2-va.apache.org"'}
return quart.Response(status=401, headers=headers, response="Please authenticate yourself first!\n")
else: # Web UI response, do not send Realm header (we do not want a pop-up in our browser!)
return quart.Response(status=401, response="Please authenticate yourself first!\n")
class FilenameConverter(werkzeug.routing.BaseConverter):
"""Simple converter that splits a filename into a basename and an extension"""
regex = r"^[^/.]*(\.[A-Za-z0-9]+)?$"
part_isolating = False
def to_python(self, filename):
extension = ""
# If foo.bar, split into base and ext. Otherwise, keep filename as full string (even for .htaccess etc)
if "." in filename[1:]:
filename, extension = filename.split(".", maxsplit=1)
return filename, extension
async def reset_rate_limits():
"""Reset daily rate limits for lookups"""
while True:
await asyncio.sleep(86400)
config.rate_limits.clear()
def rate_limited(func):
"""Decorator for calls that are rate-limited for anonymous users.
Once the number of requests per day has been exceeded, this decorator
will return a 429 HTTP response to the client instead.
"""
@functools.wraps(func)
async def session_wrapper(*args):
ip = quart.request.headers.get("X-Forwarded-For", quart.request.remote_addr).split(",")[-1].strip()
usage = config.rate_limits.get(ip, 0) + 1
if config.server.rate_limit_per_ip and usage > config.server.rate_limit_per_ip:
return quart.Response(
status=429, response="Your request has been rate-limited. Please check back tomorrow!"
)
config.rate_limits[ip] = usage
print(ip, usage)
return await func(*args)
return session_wrapper
class CachedJson:
"""A simple JSON URL fetcher with a built-in cache. Once a cached JSON element expires,
any subsequent reads will cause the class to re-fetch the object. If the object cannot be
fetched, a stale version will be returned instead.
Usage example:
foo = CachedJson("https://www.apache.org/foo.json")
jsondata = await foo.json
# do stuff here....
jsondata = await foo.json # <- will either use cache or fetch again if out of date
"""
def __init__(self, url: str, expiry: int = 30):
self.url = url
self.expiry = expiry
self.last = 0
self.timeout = aiohttp.ClientTimeout(total=30)
self._cache = None
@property
async def json(self):
now = time.time()
if now > (self.last + self.expiry): # Cache expired?
try:
async with aiohttp.ClientSession(timeout=self.timeout) as hc:
async with hc.get(self.url) as req:
if req.status == 200:
source_json = await req.json()
if source_json:
self._cache = source_json
self.last = now
except (aiohttp.ClientError, json.JSONDecodeError) as e:
print(f"Could not fetch URL {self.url} for caching, will use stale checkout: {e}.")
return self._cache