| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Endpoint for returning emails in mbox format as a single archive""" |
| import asyncio |
| import plugins.server |
| import plugins.session |
| import plugins.messages |
| import plugins.defuzzer |
| import re |
| import typing |
| import aiohttp.web |
| import sys |
| if sys.version_info >= (3,8): |
| from asyncio.exceptions import CancelledError |
| elif sys.version_info >= (3,7): |
| from asyncio import CancelledError |
| import email.utils as eutils |
| import datetime |
| |
| |
| def convert_source(source) -> str: |
| if source: |
| source_as_text = source["_source"]["source"] |
| # Ensure it starts with "From "...or fake it |
| if not source_as_text.startswith("From "): |
| from_line = "From MAILER-DAEMON Thu Jan 1 00:00:00 1970\n" # Fallback in case no date found |
| # If we have any Received: headers, we can extrapolate an approximate time from the last (top) one. |
| from_match = re.search(r"(?:[\r\n]|^)Received:\s+from[^;]+?;\s+(.+?)[\r\n]", source_as_text) |
| if from_match: |
| recv_time = eutils.parsedate_tz(from_match.group(1)) |
| if recv_time: |
| dt_tuple = datetime.datetime(*recv_time[:7]) |
| if recv_time[9]: # If we have a timezone offset, apply via timedelta |
| dt_tuple += datetime.timedelta(seconds=recv_time[9]) |
| # Set using ctime, as per https://datatracker.ietf.org/doc/html/rfc4155#appendix-A |
| from_line = "From MAILER-DAEMON %s\n" % dt_tuple.ctime() |
| source_as_text = from_line + source_as_text |
| # Convert to mboxrd format |
| mboxrd_source = "" |
| line_no = 0 |
| for line in source_as_text.split("\n"): |
| line_no += 1 |
| if line_no > 1 and re.match(r"^>*From\s+", line): |
| line = ">" + line |
| mboxrd_source += line + "\n" |
| return mboxrd_source |
| return "" |
| |
| |
| async def process( |
| server: plugins.server.BaseServer, |
| request: aiohttp.web.BaseRequest, |
| session: plugins.session.SessionObject, |
| indata: dict, |
| ) -> typing.Union[dict, aiohttp.web.Response, aiohttp.web.StreamResponse]: |
| |
| lid = indata.get("list", "_") |
| if lid == '*': |
| lid = 'all' |
| domain = indata.get("domain", "_") |
| if domain == '*': |
| domain = 'all' |
| # may be provided as d= or date= |
| yyyymm = indata.get("d") or indata.get("date") # e.g. 2019-9; can also be lte=1M etc |
| q = indata.get("q") |
| |
| try: |
| query_defuzzed = plugins.defuzzer.defuzz(indata, list_override="@" in lid and lid or None) |
| except ValueError as ve: # If defuzzer encounters syntax errors, it will throw a ValueError |
| return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=400, text=str(ve)) |
| except AssertionError as ae: # If defuzzer encounters internal errors, it will throw an AssertionError |
| return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=500, text=str(ae)) |
| |
| dlstem = f"{lid}_{domain}" |
| if yyyymm: |
| if len(yyyymm) == 6 and yyyymm[4] == '-': # assume yyyy-m, convert to yyyy-mm |
| yyyymm = yyyymm[0:-1] + "0" + yyyymm[-1] |
| dlstem = f"{dlstem}_{yyyymm}" |
| if q: |
| dlstem = f"{dlstem}_{q}" |
| # Figure out a sane filename stem (don't keep '.') |
| dlstem = re.sub(r"[^-_a-zA-Z0-9]+", "_", dlstem) |
| headers = {"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlstem}.mbox"} |
| |
| # Return mbox archive with filename as a stream |
| response = aiohttp.web.StreamResponse(status=200, headers=headers) |
| response.enable_chunked_encoding() |
| await response.prepare(request) |
| |
| async for emails in plugins.messages.query_batch( |
| session, |
| query_defuzzed, |
| metadata_only=True, |
| epoch_order="asc" |
| ): |
| for email in emails: |
| source = await plugins.messages.get_source(session, permalink=email.get("dbid")) |
| mboxrd_source = convert_source(source) |
| # Ensure each non-empty source ends with a blank line |
| if not mboxrd_source.endswith("\n\n"): |
| mboxrd_source += "\n" |
| try: |
| async with server.streamlock: |
| await asyncio.wait_for(response.write(mboxrd_source.encode("utf-8")), timeout=5) |
| except (TimeoutError, RuntimeError, CancelledError): |
| break # Writing stream failed, break it off. |
| return response |
| |
| |
| def register(server: plugins.server.BaseServer): |
| # Note that this is a StreamingEndpoint! |
| return plugins.server.StreamingEndpoint(process) |