blob: fa249d839404f389ec7adac0d44e9f8c8cdfcdce [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Endpoint for returning emails in mbox format as a single archive"""
import asyncio
import plugins.server
import plugins.session
import plugins.messages
import plugins.defuzzer
import re
import typing
import aiohttp.web
import sys
if sys.version_info >= (3,8):
from asyncio.exceptions import CancelledError
elif sys.version_info >= (3,7):
from asyncio import CancelledError
import email.utils as eutils
import datetime
def convert_source(source) -> str:
if source:
source_as_text = source["_source"]["source"]
# Ensure it starts with "From "...or fake it
if not source_as_text.startswith("From "):
from_line = "From MAILER-DAEMON Thu Jan 1 00:00:00 1970\n" # Fallback in case no date found
# If we have any Received: headers, we can extrapolate an approximate time from the last (top) one.
from_match = re.search(r"(?:[\r\n]|^)Received:\s+from[^;]+?;\s+(.+?)[\r\n]", source_as_text)
if from_match:
recv_time = eutils.parsedate_tz(from_match.group(1))
if recv_time:
dt_tuple = datetime.datetime(*recv_time[:7])
if recv_time[9]: # If we have a timezone offset, apply via timedelta
dt_tuple += datetime.timedelta(seconds=recv_time[9])
# Set using ctime, as per https://datatracker.ietf.org/doc/html/rfc4155#appendix-A
from_line = "From MAILER-DAEMON %s\n" % dt_tuple.ctime()
source_as_text = from_line + source_as_text
# Convert to mboxrd format
mboxrd_source = ""
line_no = 0
for line in source_as_text.split("\n"):
line_no += 1
if line_no > 1 and re.match(r"^>*From\s+", line):
line = ">" + line
mboxrd_source += line + "\n"
return mboxrd_source
return ""
async def process(
server: plugins.server.BaseServer,
request: aiohttp.web.BaseRequest,
session: plugins.session.SessionObject,
indata: dict,
) -> typing.Union[dict, aiohttp.web.Response, aiohttp.web.StreamResponse]:
lid = indata.get("list", "_")
if lid == '*':
lid = 'all'
domain = indata.get("domain", "_")
if domain == '*':
domain = 'all'
# may be provided as d= or date=
yyyymm = indata.get("d") or indata.get("date") # e.g. 2019-9; can also be lte=1M etc
q = indata.get("q")
try:
query_defuzzed = plugins.defuzzer.defuzz(indata, list_override="@" in lid and lid or None)
except ValueError as ve: # If defuzzer encounters syntax errors, it will throw a ValueError
return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=400, text=str(ve))
except AssertionError as ae: # If defuzzer encounters internal errors, it will throw an AssertionError
return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=500, text=str(ae))
dlstem = f"{lid}_{domain}"
if yyyymm:
if len(yyyymm) == 6 and yyyymm[4] == '-': # assume yyyy-m, convert to yyyy-mm
yyyymm = yyyymm[0:-1] + "0" + yyyymm[-1]
dlstem = f"{dlstem}_{yyyymm}"
if q:
dlstem = f"{dlstem}_{q}"
# Figure out a sane filename stem (don't keep '.')
dlstem = re.sub(r"[^-_a-zA-Z0-9]+", "_", dlstem)
headers = {"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlstem}.mbox"}
# Return mbox archive with filename as a stream
response = aiohttp.web.StreamResponse(status=200, headers=headers)
response.enable_chunked_encoding()
await response.prepare(request)
async for emails in plugins.messages.query_batch(
session,
query_defuzzed,
metadata_only=True,
epoch_order="asc"
):
for email in emails:
source = await plugins.messages.get_source(session, permalink=email.get("dbid"))
mboxrd_source = convert_source(source)
# Ensure each non-empty source ends with a blank line
if not mboxrd_source.endswith("\n\n"):
mboxrd_source += "\n"
try:
async with server.streamlock:
await asyncio.wait_for(response.write(mboxrd_source.encode("utf-8")), timeout=5)
except (TimeoutError, RuntimeError, CancelledError):
break # Writing stream failed, break it off.
return response
def register(server: plugins.server.BaseServer):
# Note that this is a StreamingEndpoint!
return plugins.server.StreamingEndpoint(process)