| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import io |
| import json |
| import urllib.parse |
| |
| import aiohttp.web |
| import multipart |
| |
| PYPONY_MAX_PAYLOAD_KB = 256 |
| PYPONY_MAX_PAYLOAD = PYPONY_MAX_PAYLOAD_KB * 1024 |
| ERRONEOUS_PAYLOAD = "Erroneous payload received" |
| |
| |
| async def parse_formdata(body_type, request: aiohttp.web.BaseRequest) -> dict: |
| # Start with query string data for seeding our data dictionary |
| indata = {k: v for k, v in request.query.items()} |
| |
| # PUT/POST form data? |
| if request.method in ["PUT", "POST"]: |
| if request.can_read_body: |
| try: |
| if ( |
| request.content_length |
| and request.content_length > PYPONY_MAX_PAYLOAD |
| ): |
| raise ValueError("Form data payload too large, max %dkb allowed" % PYPONY_MAX_PAYLOAD_KB) |
| body = await request.text() |
| if body_type == "json": |
| try: |
| js = json.loads(body) |
| assert isinstance( |
| js, dict |
| ) # json data MUST be an dictionary object, {...} |
| indata.update(js) |
| except ValueError as e: |
| raise ValueError(ERRONEOUS_PAYLOAD) from e |
| elif body_type == "form": |
| if ( |
| request.headers.get("content-type", "").lower() |
| == "application/x-www-form-urlencoded" |
| ): |
| try: |
| for key, val in urllib.parse.parse_qsl(body): |
| indata[key] = val |
| except ValueError as e: |
| raise ValueError(ERRONEOUS_PAYLOAD) from e |
| # If multipart, turn our body into a BytesIO object and use multipart on it |
| elif ( |
| "multipart/form-data" |
| in request.headers.get("content-type", "").lower() |
| ): |
| fh = request.headers.get("content-type", "" ) |
| fb = fh.find("boundary=") |
| if fb > 0: |
| boundary = fh[fb + 9 :] |
| if boundary: |
| try: |
| for part in multipart.MultipartParser( |
| io.BytesIO(body.encode("utf-8")), |
| boundary, |
| len(body), |
| ): |
| indata[part.name] = part.value |
| except ValueError as e: |
| raise ValueError(ERRONEOUS_PAYLOAD) from e |
| finally: |
| pass |
| return indata |