blob: 3ec7a5c6896e84773ae2b2721644dec3156e1f08 [file]
#!/usr/bin/env python3
"""Miscellaneous helpers for ASFQuart"""
import os.path
import io
import functools
import asyncio
import logging
import quart
import werkzeug.routing
LOGGER = logging.getLogger(__name__)
DEFAULT_MAX_CONTENT_LENGTH = 102400
async def formdata():
"""Catch-all form data converter. Converts form data of any form (json, urlencoded, mime, etc) to a dict"""
form_data = dict()
form_data.update(quart.request.args.to_dict()) # query string args
xform = await quart.request.form # POST form data
# Pre-parse check for form data size
if quart.request.content_type and any(
x in quart.request.content_type
for x in (
"multipart/form-data",
"application/x-www-form-urlencoded",
"application/x-url-encoded",
)
):
# If the content is too large for us to handle, we need to silently ignore every chunk, so we can return with a
# cleared buffer, lest bad things happen.
max_size = quart.current_app.config.get("MAX_CONTENT_LENGTH", DEFAULT_MAX_CONTENT_LENGTH)
if quart.request.content_length > max_size:
async for _data in quart.request.body:
pass
return quart.Response(
status=413,
response=f"Request content length ({quart.request.content_length} bytes) is larger than what is permitted for form data ({max_size} bytes)!",
)
if xform:
form_data.update(xform.to_dict())
if quart.request.is_json: # JSON data from a PUT?
xjson = await quart.request.json
form_data.update(xjson)
return form_data
class FilenameConverter(werkzeug.routing.BaseConverter):
"""Simple converter that splits a filename into a basename and an extension. Only deals with filenames, not
full paths. Thus, <filename> will match foo.txt, but not /foo/bar.baz"""
regex = r"^[^/.]*(\.[A-Za-z0-9]+)?$"
part_isolating = False
def to_python(self, filename): # pylint: disable=arguments-renamed
return os.path.splitext(filename) # superclass function uses 'value'
#
# Decorator to use a template in order to generate a webpage with some
# provided data.
#
# EXAMPLE:
#
# @use_template(T_MAIN)
# def main_page():
# ...
# data = {
# 'title': 'Main Page',
# 'count': 42,
# }
# return data
#
# The data dictionary will be provided to the EZT template for
# rendering the page.
#
def use_template(template):
# The @use_template(T_MAIN) example is actually a function call
# to *produce* a decorator function. This is that decorator. It
# takes a function to wrap (FUNC), and produces a wrapping function
# that will be used during operation (WRAPPER).
def decorator(func):
# .wraps() copies name/etc from FUNC onto the wrapper function
# that we return.
@functools.wraps(func)
async def wrapper(*args, **kw):
# Get the data dictionary from the page endpoint.
data = await func(*args, **kw)
# Render that page, and return it to Quart.
return render(template, data)
return wrapper
return decorator
def render(t, data):
"Simple function to render a template into a string."
buf = io.StringIO()
t.generate(buf, data)
return buf.getvalue()
class CancellableTask:
"Wrapper for a task that does not propagate its cancellation."
def __init__(self, coro, *, loop=None, name=None):
"Create a task for CORO in LOOP, named NAME."
if loop is None:
loop = asyncio.get_event_loop()
async def absorb_cancel():
try:
await coro
except asyncio.CancelledError:
LOGGER.debug(f'TASK CANCELLED: {self.task}')
self.task = loop.create_task(absorb_cancel(), name=name)
def cancel(self):
"Cancel the task, and clean up its CancelledError exception."
self.task.cancel()