| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ Publish notifications about mails to pony mail. |
| |
| Copy this file to $mailman_plugin_dir/mailman_ponymail/__init__.py |
| Also copy ponymail.cfg to that dir. |
| Enable the module by adding the following to your mailman.cfg file:: |
| |
| [archiver.ponymail] |
| # The class implementing the IArchiver interface. |
| class: mailman_ponymail_plugin.Archiver |
| enable: yes |
| |
| and by adding the following to archiver.yaml: |
| |
| [mailman] |
| plugin: true |
| |
| OR, to use the STDIN version (non-MM3 mailing list managers), |
| sub someone to the list(s) and add this to their .forward file: |
| "|/usr/bin/env python3 /path/to/archiver.py" |
| |
| """ |
| |
| import argparse |
| import base64 |
| import collections |
| import email.header |
| import email.utils |
| import email.policy |
| import email.headerregistry |
| import fnmatch |
| import hashlib |
| import json |
| import logging |
| import os |
| import re |
| import sys |
| import time |
| import traceback |
| import typing |
| import uuid |
| import mimetypes |
| |
| import elasticsearch |
| import formatflowed |
| import netaddr |
| |
| if not __package__: |
| from plugins import ponymailconfig |
| from plugins import generators, textlib |
| from plugins.elastic import Elastic |
| else: |
| from .plugins import ponymailconfig |
| from .plugins import generators, textlib |
| from .plugins.elastic import Elastic |
| |
| # This is what we will default to if we are presented with emails without character sets and US-ASCII doesn't work. |
| DEFAULT_CHARACTER_SET = 'utf-8' |
| |
| # Standard "short body" max length for email aggregations |
| SHORT_BODY_MAX_LEN = 200 # This must be the same as server.plugins.messages.SHORT_BODY_MAX_LEN |
| |
| # Fetch config from same dir as archiver.py |
| config = ponymailconfig.PonymailConfig() |
| |
| # Set some vars before we begin |
| logger = None |
| |
| normalize_lid = textlib.normalize_lid # Unit test fallback |
| |
| # If MailMan is enabled, import and set it up |
| if config.has_option("mailman", "plugin"): |
| from mailman.interfaces.archiver import ArchivePolicy, IArchiver |
| from zope.interface import implementer |
| |
| logger = logging.getLogger("mailman.archiver") |
| |
| # Access URL once archived |
| aURL = config.get("archiver", "baseurl") |
| |
| # Get/Set email parsing policy (primarily ascii/7bit vs native utf8) |
| # (used by import-mbox) |
| policy_choice = config.get("archiver", "policy", fallback="default") |
| policy: typing.Any |
| if policy_choice == "compat32": |
| policy = email.policy.compat32 # 7bit lines |
| else: |
| if policy_choice == "smtputf8": |
| policy = email.policy.SMTPUTF8.clone() # 8bit/unicode lines |
| else: |
| policy = email.policy.default.clone() # Default (8bit) lines in Python >=3.3 |
| # email parsing is currently too strict; override the classes |
| policy.header_factory.map_to_type('references', email.headerregistry.UnstructuredHeader) |
| policy.header_factory.map_to_type('message-id', email.headerregistry.UnstructuredHeader) |
| |
| |
| def encode_base64(buff: bytes) -> str: |
| """ Convert bytes to base64 as text string (no newlines) """ |
| return base64.standard_b64encode(buff).decode("ascii", "ignore") |
| |
| |
| def mbox_source(b: bytes) -> str: |
| # Common method shared with import-mbox |
| try: |
| # Can we store as ASCII? |
| return b.decode("ascii", errors="strict") |
| except UnicodeError: |
| # No, so must use base64 to avoid corruption on re-encoding |
| return encode_base64(b) |
| |
| # Common method shared with import-mbox to ensure consistency |
| def parse_message(raw_message): |
| return email.message_from_bytes(raw_message, policy=policy) |
| |
| def parse_attachment( |
| part: email.message.Message, |
| ) -> typing.Tuple[typing.Optional[dict], typing.Optional[str]]: |
| """ |
| Parses an attachment in an email, turns it into a dict with a content-type, sha256 digest, file size and file name. |
| Also returns the attachment contents as base64 encoded string. |
| :param part: The message part to parse |
| :return: attachment info and contents as b64 string |
| """ |
| cd = part.get("Content-Disposition", None) |
| if cd: |
| # Use str() in case the name is not in ASCII. |
| # In such cases, the get() method returns a Header not a string |
| dispositions = str(cd).strip().split(";") |
| cdtype = dispositions[0].lower() |
| if cdtype in {"attachment", "inline"}: |
| fd = part.get_payload(decode=True) |
| filename = part.get_filename() |
| # If attachment is without a name, invent it. |
| ctype = part.get_content_type() |
| if ctype and not filename: |
| ext = mimetypes.guess_extension(ctype) |
| if not ext: # dunno this extension, fake .txt |
| ext = ".txt" |
| filename = f"{cdtype}{ext}" |
| if not fd and cdtype == "inline": # If inline, convert to source |
| fd = part.as_bytes() |
| # Allow for empty string |
| if fd is None: |
| return None, None |
| if filename: |
| attachment = { |
| "content_type": part.get_content_type(), |
| "size": len(fd), |
| "filename": filename, |
| } |
| h = hashlib.sha256(fd).hexdigest() |
| b64 = encode_base64(fd) |
| attachment["hash"] = h |
| return attachment, b64 # Return meta data and contents separately |
| return None, None |
| |
| |
| def message_attachments(msg: email.message.Message) -> typing.Tuple[list, dict]: |
| """ |
| Parses an email and returns all attachments found as a tuple of metadata and contents |
| :param msg: The email to parse |
| :return: a tuple of attachment metadata and their content |
| """ |
| attachments = [] |
| contents = {} |
| for part in msg.walk(): |
| part_meta, part_file = parse_attachment(part) |
| if part_meta: |
| attachments.append(part_meta) |
| contents[part_meta["hash"]] = part_file |
| return attachments, contents |
| |
| |
| class Body: |
| def __init__(self, part: email.message.Message): |
| self.content_type = part.get_content_type() |
| self.charsets = [part.get_content_charset()] # Part's charset |
| parent_charset = part.get_charsets()[0] |
| if parent_charset and parent_charset != self.charsets[0]: |
| self.charsets.append( |
| parent_charset |
| ) # Parent charset as fallback if any/different |
| self.character_set = None |
| self.string: typing.Optional[str] = None |
| self.flowed = "format=flowed" in part.get("content-type", "") |
| self.bytes = part.get_payload(decode=True) |
| self.html_as_source = False |
| if self.bytes is not None: |
| valid_encodings = [x for x in self.charsets if x] |
| if valid_encodings: |
| for cs in valid_encodings: |
| try: |
| self.string = self.bytes.decode(cs) |
| self.character_set = str(cs) |
| break |
| except UnicodeDecodeError: |
| pass |
| # If no character set was defined, the email MUST be US-ASCII by RFC822 defaults |
| # This isn't always the case, as we're about to discover. |
| if not self.string: |
| try: |
| self.string = self.bytes.decode("us-ascii", errors="strict") |
| if valid_encodings: |
| self.character_set = "us-ascii" |
| except UnicodeDecodeError: |
| # If us-ascii strict fails, it's probably undeclared UTF-8 (it happens!). |
| # Set the .string, but not a character set, as we don't know it for sure. |
| # This is mainly so the older generators won't barf, as the generator will |
| # be fed the message body as a bytes object if no encoding is set, while |
| # the resulting metadoc will always use the string version. |
| self.string = self.bytes.decode(DEFAULT_CHARACTER_SET, "replace") |
| |
| def __repr__(self): |
| return self.string |
| |
| def __len__(self): |
| return len(self.string or "") |
| |
| def assign(self, new_string): |
| self.string = new_string |
| |
| def encode(self, encoding=DEFAULT_CHARACTER_SET, errors="strict"): |
| return self.string.encode(encoding=encoding, errors=errors) |
| |
| def unflow(self, convert_lf=False): |
| """Unflows text of type format=flowed. |
| By default, lines ending in LF (mbox imports) are not converted to CRLF, and thus |
| not unflowed. This is to be consistent with previous versions of Pony Mail, and |
| can be enabled for any new installations that that not reimaging their database. |
| """ |
| if self.string: |
| if self.flowed: |
| # Use provider character set or fall back to our sane default. |
| character_set = self.character_set or DEFAULT_CHARACTER_SET |
| # Convert lone LF to CRLF if found |
| if convert_lf: |
| fixed_string = "\r\n".join( |
| [x.rstrip("\r") for x in self.string.split("\n")] |
| ) |
| conversion_was_needed = fixed_string != self.string |
| else: |
| fixed_string = self.string |
| flow_fixed = formatflowed.convertToWrapped( |
| fixed_string.encode(character_set, errors="ignore"), |
| wrap_fixed=False, |
| character_set=character_set, |
| ) |
| # If we "upconverted" from LF to CRLF, convert back after flow decoding |
| if convert_lf and conversion_was_needed: |
| flow_fixed = "\n".join( |
| [x.rstrip("\r") for x in self.string.split("\n")] |
| ) |
| return flow_fixed |
| return self.string |
| |
| |
| def message_identifiers(header, reverse=False): |
| if "<" not in header: |
| return [] |
| parts = header.split("<") |
| identifier_junks = parts[1:] |
| identifiers = [] |
| for identifier_junk in identifier_junks: |
| identifier = identifier_junk.split(">").pop(0) |
| identifiers.append("<" + identifier + ">") |
| if reverse is True: |
| identifiers = list(reversed(identifiers)) |
| return identifiers |
| |
| |
| def get_parent_identifiers(ojson): |
| identifiers = [] |
| mirt = ojson.get("in-reply-to", "") |
| for irt in message_identifiers(mirt, reverse=True): |
| identifiers.append(irt) |
| mref = ojson.get("references", "") |
| for ref in message_identifiers(mref, reverse=True): |
| identifiers.append(ref) |
| return identifiers |
| |
| |
| def get_by_message_id(elastic, msgid, timeout="5s"): |
| data = elastic.search(index=elastic.db_mbox, body={ |
| "query": { |
| "bool": { |
| "must": {"term": {"message-id": msgid}} |
| } |
| } |
| }, timeout=timeout) |
| if data["hits"]["total"]["value"] == 1: |
| return data["hits"]["hits"][0]["_source"] |
| return None |
| |
| |
| def get_parent_info(elastic, ojson, timeout=5, limit=10): |
| parent_identifiers = get_parent_identifiers(ojson) |
| if not parent_identifiers: |
| return None |
| for parent_identifier in parent_identifiers: |
| parent_info = get_by_message_id(elastic, parent_identifier, timeout) |
| if parent_info is not None: |
| return parent_info |
| limit -= 1 |
| if limit < 1: |
| break |
| return None |
| |
| |
| def get_previous_mid(elastic, ojson, timeout="5s"): |
| forum = ojson["forum"] |
| latest = ojson.get("epoch", 1) - 1 |
| data = elastic.search(index=elastic.db_mbox, body={ |
| "query": { |
| "bool": { |
| "must": [ |
| {"range": {"epoch": {"lte": latest}}}, |
| {"term": {"forum": forum}}, |
| {"term": {"top": True}} |
| ] |
| } |
| }, |
| "sort": [{"epoch": "desc"}], |
| "size": 1, |
| "_source": "mid", |
| }, timeout=timeout) |
| for hit in data["hits"]["hits"]: |
| return hit["_source"]["mid"] |
| return None |
| |
| |
| def add_thread_properties(elastic, ojson, timeout="5s", limit=5): |
| parent_info = get_parent_info(elastic, ojson, timeout, limit) |
| if parent_info is None: |
| top = True |
| thread = ojson["mid"] |
| previous = get_previous_mid(elastic, ojson, timeout) |
| else: |
| top = False |
| thread = parent_info.get("thread") |
| previous = parent_info["mid"] |
| |
| ojson["top"] = top |
| ojson["thread"] = thread |
| ojson["previous"] = previous |
| return ojson |
| |
| |
| class Archiver(object): # N.B. Also used by import-mbox.py |
| """The general archiver class. Compatible with MailMan3 archiver classes.""" |
| |
| if config.has_option("mailman", "plugin"): |
| implementer(IArchiver) |
| name = "foal" |
| |
| # This is a list of headers which are stored in msg_metadata |
| HDR_KEYS = [ |
| "archived-at", |
| "from", |
| "cc", |
| "to", |
| "date", |
| "in-reply-to", |
| "message-id", |
| "subject", |
| "references", |
| ] |
| # keys that need to be decoded |
| HDR_KEYS_DECODE = ["to", "from", "subject", "message-id"] |
| |
| def __init__( |
| self, generator=None, parse_html=False, ignore_body=None, verbose=False |
| ): |
| """ Just initialize ES. """ |
| self.html = parse_html |
| # Fall back to full hashing if nothing is set. |
| self.generator = generator or config.get( |
| "archiver", "generator", fallback="full" |
| ) |
| self.cropout = config.get("debug", "cropout") |
| self.verbose = verbose |
| self.ignore_body = ignore_body |
| if self.html: |
| import html2text |
| |
| self.html2text = html2text.html2text |
| |
| def message_body(self, msg: email.message.Message) -> typing.Optional[Body]: |
| """ |
| Fetches the proper text body from an email as an archiver.Body object |
| :param msg: The email or part of it to examine for proper body |
| :return: archiver.Body object |
| """ |
| body = None |
| first_html = None |
| for part in msg.walk(): |
| # can be called from importer |
| if self.verbose: |
| print("Content-Type: %s" % part.get_content_type()) |
| """ |
| Find the first body part and the first HTML part |
| Note: cannot use break here because firstHTML is needed if len(body) <= 1 |
| """ |
| try: |
| if body is None and part.get_content_type() in [ |
| "text/plain", |
| "text/enriched", |
| ]: |
| body = Body(part) |
| elif ( |
| not first_html |
| and part.get_content_type() == "text/html" |
| ): |
| first_html = Body(part) |
| except Exception as err: |
| print(err) |
| |
| # this requires a GPL lib, user will have to install it themselves |
| if first_html and ( |
| body is None |
| or len(body) <= 1 |
| or (self.ignore_body and str(body).find(str(self.ignore_body)) != -1) |
| ): |
| body = first_html |
| body.html_as_source = True |
| |
| # Convert HTML to text if mod is installed and enabled, otherwise keep the source as-is |
| if self.html: |
| body.assign(self.html2text(str(body))) |
| body.html_as_source = False |
| return body |
| |
| # N.B. this is also called by import-mbox.py |
| def compute_updates( |
| self, |
| lid: typing.Optional[str], |
| private: bool, |
| msg: email.message.Message, |
| raw_msg: bytes, |
| default_epoch: typing.Union[None, str, int] = None |
| ) -> typing.Tuple[typing.Optional[dict], dict, dict, typing.Optional[str], bool]: |
| """Determine what needs to be sent to the archiver. |
| :param lid: The list id |
| :param private: Whether privately archived email or not (bool) |
| :param msg: The message object |
| :param raw_msg: The raw message bytes |
| |
| :return None if the message could not be parsed, otherwise a four-tuple consisting of: |
| the digested email as a dict, its attachments, its metadata fields and any |
| in-reply-to data found. |
| """ |
| notes = [] # Put debug notes in here, for later... |
| |
| if not lid: |
| lid = textlib.normalize_lid(msg.get("list-id"), strict=True) |
| if lid is None: |
| raise ValueError(f"Invalid list-id {lid} provided") |
| if self.cropout: |
| crops = self.cropout.split(" ") |
| # Regex replace? |
| if len(crops) == 2: |
| lid = re.sub(crops[0], crops[1], lid) |
| # Standard crop out? |
| else: |
| lid = lid.replace(self.cropout, "") |
| |
| def default_empty_string(value): |
| return str(value) if value else "" |
| msg_metadata = dict([(k, default_empty_string(msg.get(k))) for k in self.HDR_KEYS]) |
| mid = ( |
| hashlib.sha224( |
| str("%s-%s" % (lid, msg_metadata["archived-at"])).encode("utf-8") |
| ).hexdigest() |
| + "@" |
| + (lid if lid else "none") |
| ) |
| for key in self.HDR_KEYS_DECODE: |
| try: |
| hval = "" |
| if msg_metadata.get(key): |
| for t in email.header.decode_header(msg_metadata[key]): |
| if t[1] is None or t[1].find("8bit") != -1: |
| hval += str( |
| t[0].decode("utf-8") if type(t[0]) is bytes else t[0] |
| ) |
| else: |
| hval += t[0].decode(t[1], errors="ignore") |
| msg_metadata[key] = hval.strip() |
| except Exception as err: |
| print("Could not decode headers, ignoring..: %s" % err) |
| message_date = None |
| try: |
| message_date = email.utils.parsedate_tz(str(msg_metadata.get("date"))) |
| except ValueError: |
| pass |
| if not message_date and msg_metadata.get("archived-at"): |
| message_date = email.utils.parsedate_tz( |
| str(msg_metadata.get("archived-at")) |
| ) |
| if not message_date: |
| print("No message date could be derived from the Date: header, looking elsewhere.") |
| bad_date_original = str(msg_metadata.get("date")) |
| if bad_date_original: |
| notes.append(["BADDATE: Original email Date: header was set to invalid value: %s" % bad_date_original]) |
| # See if we have a "From" header line in the raw email, we can use |
| first_line = raw_msg.split(b"\n", 1)[0].decode("us-ascii") |
| if first_line.startswith("From "): |
| # If we have one, the date must be the third element when splitting by single space. |
| env_from_date = first_line.split(" ", 2)[-1] # Split twice, grab last element. |
| message_date = email.utils.parsedate_tz(env_from_date) |
| if message_date: |
| print("Found date in envelope FROM header: %s" % env_from_date) |
| notes.append(["BADDATE: Used envelope FROM header for email date: %s" % env_from_date]) |
| # Otherwise, look for a Received: header we can scan |
| if not message_date: |
| for recv_from in msg.get_all('received', []): # We may have multiple of these, not all have "from". |
| m = re.match(r"from[^;]+?;\s+(.+?)(?:$|[\r\n])", recv_from) |
| if m: |
| message_date = email.utils.parsedate_tz(m.group(1)) |
| if message_date: |
| print("Found date in Received header: %s" % m.group(1)) |
| notes.append(["BADDATE: Used Received header for email date: %s" % m.group(1)]) |
| break |
| if not message_date: |
| # Current time makes most sense for live archiving. |
| # If --defaultepoch is defined, use that instead. |
| if default_epoch is not None: |
| if default_epoch == "skip": # If we are to skip emails with bad dates... |
| return {"foo": "bar"}, {}, {}, None, True # return fake set with skipit == True |
| else: |
| print("Could not find any valid dates in email headers, using --defaultepoch parameter %s" % default_epoch) |
| epoch = int(default_epoch) |
| notes.append(["BADDATE: Falling back to default epoch specified by --defaultepoch: %s" % default_epoch]) |
| else: |
| print("Could not find any valid dates in email headers, using current time") |
| notes.append(["BADDATE: Falling back to default UNIX epoch"]) |
| epoch = int(time.time()) |
| else: |
| epoch = int(email.utils.mktime_tz(message_date)) |
| else: |
| epoch = int(email.utils.mktime_tz(message_date)) |
| # message_date calculations are all done, prepare the index entry |
| date_as_string = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(epoch)) |
| body = self.message_body(msg) |
| attachments, contents = message_attachments(msg) |
| irt = "" |
| |
| output_json = None |
| if body is not None or attachments: |
| pmid = mid |
| id_set = list() |
| # The body used for generators differ from the body put into the meta doc, |
| # for historical reasons. In the older generators where it is actively used, |
| # it would be UTF-8 bytes in cases of charset-less message bodies. It would |
| # also be nothing in case of html-only emails where html2text is not enabled. |
| generator_body = body if body and body.character_set else body and body.bytes or "" |
| if body and body.html_as_source: |
| generator_body = "" |
| for generator in self.generator.split(" "): |
| if generator: |
| try: |
| mid = generators.generate( |
| generator, |
| msg, |
| generator_body, |
| lid, |
| attachments, |
| raw_msg, |
| ) |
| except Exception as err: |
| if logger: |
| # N.B. use .get just in case there is no message-id |
| logger.info( |
| "Could not generate MID: %s. MSGID: %s", |
| err, |
| msg_metadata.get("message-id", "?").strip(), |
| ) |
| mid = pmid |
| if mid not in id_set: |
| id_set.append(mid) |
| |
| if "in-reply-to" in msg_metadata: |
| try: |
| irt_original = msg_metadata["in-reply-to"] |
| if isinstance(irt_original, list): |
| irt = "".join(irt_original) |
| else: |
| irt = str(irt_original) |
| if irt: |
| irt = irt.strip() |
| except ValueError: |
| irt = "" |
| document_id = id_set[0] |
| |
| # Pre-calculate gravatar |
| mailaddr = email.utils.parseaddr(msg_metadata["from"])[1] |
| ghash = hashlib.md5(mailaddr.encode("utf-8")).hexdigest() |
| |
| notes.append(["ARCHIVE: Email archived as %s at %u" % (document_id, time.time())]) |
| body_unflowed = body.unflow() if body else "" |
| body_shortened = body_unflowed[:SHORT_BODY_MAX_LEN+1] # +1 so that we can tell if larger than std short body. |
| |
| output_json = { |
| "from_raw": msg_metadata["from"], |
| "from": msg_metadata["from"], |
| "gravatar": ghash, |
| "to": msg_metadata["to"], |
| "subject": msg_metadata["subject"], |
| "message-id": msg_metadata["message-id"], |
| "mid": document_id, |
| "permalinks": id_set, |
| "dbid": hashlib.sha3_256(raw_msg).hexdigest(), |
| "cc": msg_metadata.get("cc"), |
| "epoch": epoch, |
| "list": lid, |
| "list_raw": lid, |
| "date": date_as_string, |
| "private": private, |
| "references": msg_metadata["references"], |
| "in-reply-to": irt, |
| "body": body_unflowed, |
| "body_short": body_shortened, |
| "html_source_only": body and body.html_as_source or False, |
| "attachments": attachments, |
| "forum": (lid or "").strip("<>").replace(".", "@", 1), |
| "size": len(raw_msg), |
| "_notes": notes, |
| "_archived_at": int(time.time()), |
| } |
| |
| return output_json, contents, msg_metadata, irt, False |
| |
| def archive_message(self, mlist, msg, raw_message=None, dry=False, dump=None, defaultepoch=None, digest=False): |
| """Send the message to the archiver. |
| |
| :param mlist: The IMailingList object. |
| :param msg: The message object. |
| :param raw_message: Raw message bytes |
| :param dry: Whether or not to actually run |
| :param dump: Optional path for dump on fail |
| |
| :return (lid, mid) |
| """ |
| |
| lid = textlib.normalize_lid(mlist.list_id, strict=True) |
| if lid is None: |
| raise ValueError(f"Invalid list id {lid}") |
| |
| private = False |
| if hasattr(mlist, "archive_public") and mlist.archive_public is True: |
| private = False |
| elif hasattr(mlist, "archive_public") and mlist.archive_public is False: |
| private = True |
| elif ( |
| hasattr(mlist, "archive_policy") |
| and mlist.archive_policy is not ArchivePolicy.public |
| ): |
| private = True |
| |
| if raw_message is None: |
| raw_message = msg.as_bytes() |
| ojson, contents, msg_metadata, irt, skipit = self.compute_updates( |
| lid, private, msg, raw_message, defaultepoch |
| ) |
| if not ojson: |
| _id = msg.get("message-id") or msg.get("Subject") or msg.get("Date") |
| raise Exception("Could not parse message %s for %s" % (_id, lid)) |
| if skipit: |
| print("Skipping archiving of email due to invalid date and default date set to skip") |
| return lid, "(skipped)" |
| if digest: |
| return lid, ojson["mid"] |
| if dry: |
| print("**** Dry run, not saving message to database *****") |
| return lid, ojson["mid"] |
| |
| if dump: |
| try: |
| elastic = Elastic() |
| except elasticsearch.exceptions.ElasticsearchException as e: |
| print(e) |
| print( |
| "ES connection failed, but dumponfail specified, dumping to %s" |
| % dump |
| ) |
| else: |
| elastic = Elastic() |
| |
| if config.get("archiver", "threadinfo"): |
| try: |
| timeout = int(config.get("archiver", "threadtimeout") or 5) |
| timeout = str(timeout) + "s" |
| limit = int(config.get("archiver", "threadparents") or 10) |
| ojson = add_thread_properties(elastic, ojson, timeout, limit) |
| except Exception as err: |
| print("Could not add thread info", err) |
| if logger: |
| logger.info("Could not add thread info %s" % (err,)) |
| else: |
| print("Added thread info successfully", ojson["mid"]) |
| if logger: |
| logger.info("Added thread info successfully %s" % (ojson["mid"],)) |
| |
| try: |
| if contents: |
| for key in contents: |
| elastic.index( |
| index=elastic.db_attachment, |
| id=key, |
| body={"source": contents[key]}, |
| ) |
| |
| elastic.index( |
| index=elastic.db_mbox, id=ojson["mid"], body=ojson, |
| ) |
| |
| elastic.index( |
| index=elastic.db_source, |
| id=ojson["dbid"], |
| body={ |
| "message-id": msg_metadata["message-id"], |
| "source": mbox_source(raw_message), |
| }, |
| ) |
| # Write to audit log |
| try: |
| auditlog_exists = elastic.indices.exists(index=elastic.db_auditlog) |
| except elasticsearch.exceptions.AuthorizationException: |
| auditlog_exists = False |
| if auditlog_exists: |
| elastic.index( |
| index=elastic.db_auditlog, |
| body={ |
| "date": time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())), |
| "action": "index", |
| "remote": "internal", |
| "author": "archiver.py", |
| "target": ojson["mid"], |
| "lid": lid, |
| "log": f"Indexed email {ojson['message-id']} for {lid} as {ojson['mid']}", |
| } |
| ) |
| |
| # If we have a dump dir and ES failed, push to dump dir instead as a JSON object |
| # We'll leave it to another process to pick up the slack. |
| except Exception as err: |
| print(err) |
| if dump: |
| print( |
| "Pushing to ES failed, but dumponfail specified, dumping JSON docs" |
| ) |
| uid = uuid.uuid4() |
| mbox_path = os.path.join(dump, "%s.json" % uid) |
| with open(mbox_path, "w") as f: |
| json.dump( |
| { |
| "id": ojson["mid"], |
| "mbox": ojson, |
| "mbox_source": { |
| "id": ojson["dbid"], |
| "permalink": ojson["mid"], |
| "message-id": msg_metadata["message-id"], |
| "source": mbox_source(raw_message), |
| }, |
| "attachments": contents, |
| }, |
| f, |
| indent=2, |
| ) |
| f.close() |
| sys.exit(0) # We're exiting here, the rest can't be done without ES |
| # otherwise fail as before |
| raise err |
| |
| if logger: |
| logger.info("Pony Mail archived message %s successfully", ojson["mid"]) |
| oldrefs = [] |
| |
| # Is this a direct reply to a pony mail email? |
| if irt != "": |
| dm = re.search(r"pony-([a-f0-9]+)-([a-f0-9]+)@", irt) |
| if dm: |
| cid = dm.group(1) |
| mid = dm.group(2) |
| if elastic.exists(index=elastic.db_account, id=cid): |
| doc = elastic.get(index=elastic.db_account, id=cid) |
| if doc: |
| oldrefs.append(cid) |
| # N.B. no index is supplied, so ES will generate one |
| elastic.index( |
| index=elastic.db_notification, |
| body={ |
| "type": "direct", |
| "recipient": cid, |
| "list": lid, |
| "private": private, |
| "date": ojson["date"], |
| "from": msg_metadata["from"], |
| "to": msg_metadata["to"], |
| "subject": msg_metadata["subject"], |
| "message-id": msg_metadata["message-id"], |
| "in-reply-to": irt, |
| "epoch": ojson["epoch"], |
| "mid": mid, |
| "seen": 0, |
| }, |
| ) |
| if logger: |
| logger.info("Notification sent to %s for %s", cid, mid) |
| |
| # Are there indirect replies to pony emails? |
| if msg_metadata.get("references"): |
| for im in re.finditer( |
| r"pony-([a-f0-9]+)-([a-f0-9]+)@", msg_metadata.get("references") |
| ): |
| cid = im.group(1) |
| mid = im.group(2) |
| # TODO: Fix this to work with pibbles |
| if elastic.exists(index=elastic.db_mbox, id=cid): |
| doc = elastic.get(index=elastic.db_mbox, id=cid) |
| |
| # does the user want to be notified of indirect replies? |
| if ( |
| doc |
| and "preferences" in doc["_source"] |
| and doc["_source"]["preferences"].get("notifications") |
| == "indirect" |
| and cid not in oldrefs |
| ): |
| oldrefs.append(cid) |
| # N.B. no index mapping is supplied, so ES will generate one |
| elastic.index( |
| index=elastic.db_notification, |
| body={ |
| "type": "indirect", |
| "recipient": cid, |
| "list": lid, |
| "private": private, |
| "date": ojson["date"], |
| "from": msg_metadata["from"], |
| "to": msg_metadata["to"], |
| "subject": msg_metadata["subject"], |
| "message-id": msg_metadata["message-id"], |
| "in-reply-to": irt, |
| "epoch": ojson["epoch"], |
| "mid": mid, |
| "seen": 0, |
| }, |
| ) |
| if logger: |
| logger.info("Notification sent to %s for %s", cid, mid) |
| return lid, ojson["mid"] |
| |
| def list_url(self, _mlist): |
| """ Required by MM3 plugin API |
| """ |
| return None |
| |
| def permalink(self, _mlist, _msg): |
| """ Required by MM3 plugin API |
| """ |
| return None |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description="Command line options.") |
| parser.add_argument( |
| "--lid", dest="lid", type=str, nargs=1, help="Alternate specific list ID" |
| ) |
| parser.add_argument( |
| "--digest", |
| dest="digest", |
| action="store_true", |
| help="Only digest the email and spit out the generated ID, do not archive", |
| ) |
| parser.add_argument( |
| "--altheader", |
| dest="altheader", |
| type=str, |
| nargs=1, |
| help="Alternate header for list ID", |
| ) |
| parser.add_argument( |
| "--allowfrom", |
| dest="allowfrom", |
| type=str, |
| nargs=1, |
| help="(optional) source IP (mail server) to allow posts from, ignore if no match", |
| ) |
| parser.add_argument( |
| "--ignore", |
| dest="ignorefrom", |
| type=str, |
| nargs=1, |
| help="Sender/list to ignore input from (owner etc)", |
| ) |
| parser.add_argument( |
| "--private", |
| dest="private", |
| action="store_true", |
| help="This is a private archive", |
| ) |
| parser.add_argument( |
| "--makedate", |
| dest="makedate", |
| action="store_true", |
| help="Use the archive timestamp as the email date instead of the Date header", |
| ) |
| parser.add_argument( |
| "--quiet", |
| dest="quiet", |
| action="store_true", |
| help="Do not exit -1 if the email could not be parsed", |
| ) |
| parser.add_argument( |
| "--verbose", |
| dest="verbose", |
| action="store_true", |
| help="Output additional log messages", |
| ) |
| parser.add_argument( |
| "--html2text", |
| dest="html2text", |
| action="store_true", |
| help="Try to convert HTML to text if no text/plain message is found", |
| ) |
| parser.add_argument( |
| "--dry", |
| dest="dry", |
| action="store_true", |
| help="Do not save emails to elasticsearch, only test parsing", |
| ) |
| parser.add_argument( |
| "--ignorebody", |
| dest="ibody", |
| type=str, |
| nargs=1, |
| help="Optional email bodies to treat as empty (in conjunction with --html2text)", |
| ) |
| parser.add_argument( |
| "--dumponfail", |
| dest="dump", |
| help="If pushing to ElasticSearch fails, dump documents in JSON format to this directory and " |
| "fail silently.", |
| ) |
| parser.add_argument( |
| "--defaultepoch", |
| dest="defaultepoch", |
| help="If no date could be found in the email, use this epoch. Set to 'skip' to skip importing on bad date", |
| ) |
| parser.add_argument("--generator", dest="generator", help="Override the generator.") |
| args = parser.parse_args() |
| |
| if args.verbose: |
| logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
| |
| archie = Archiver( |
| generator=args.generator, |
| parse_html=args.html2text, |
| ignore_body=args.ibody, |
| verbose=args.verbose, |
| ) |
| # use binary input so parser can use appropriate charset |
| input_stream = sys.stdin.buffer |
| |
| try: |
| raw_message = input_stream.read() |
| |
| try: |
| msg = parse_message(raw_message) |
| except Exception as err: |
| print("STDIN parser exception: %s" % err) |
| sys.exit(-1) |
| |
| if args.altheader: |
| alt_header = args.altheader[0] |
| if alt_header in msg: |
| try: |
| msg.replace_header("List-ID", msg.get(alt_header)) |
| except KeyError: |
| msg.add_header("list-id", msg.get(alt_header)) |
| elif "altheader" in sys.argv: |
| alt_header = sys.argv[len(sys.argv) - 1] |
| if alt_header in msg: |
| try: |
| msg.replace_header("List-ID", msg.get(alt_header)) |
| except KeyError: |
| msg.add_header("list-id", msg.get(alt_header)) |
| |
| # Set specific LID? |
| if args.lid and len(args.lid[0]) > 3: |
| try: |
| msg.replace_header("List-ID", args.lid[0]) |
| except KeyError: |
| msg.add_header("list-id", args.lid[0]) |
| |
| # Ignore based on --ignore flag? |
| if args.ignorefrom: |
| ignore_from = args.ignorefrom[0] |
| if fnmatch.fnmatch(msg.get("from"), ignore_from) or ( |
| msg.get("list-id") and fnmatch.fnmatch(msg.get("list-id"), ignore_from) |
| ): |
| print("Ignoring message as instructed by --ignore flag") |
| sys.exit(0) |
| |
| # Check CIDR if need be |
| if args.allowfrom: |
| |
| c = netaddr.IPNetwork(args.allowfrom[0]) |
| good = False |
| for line in msg.get_all("received") or []: |
| m = re.search(r"from .+\[(.+)]", line) |
| if m: |
| try: |
| ip = netaddr.IPAddress(m.group(1)) |
| if ip in c: |
| good = True |
| msg.add_header("ip-whitelisted", "yes") |
| break |
| except ValueError: |
| pass |
| except netaddr.AddrFormatError: |
| pass |
| if not good: |
| print("No whitelisted IP found in message, aborting") |
| sys.exit(-1) |
| # Replace date header with $now? |
| if args.makedate: |
| msg.replace_header("date", email.utils.formatdate()) |
| is_public = True |
| if args.private: |
| is_public = False |
| if "list-id" in msg: |
| list_data = collections.namedtuple( |
| "importmsg", |
| [ |
| "list_id", |
| "archive_public", |
| "archive_policy", |
| "list_name", |
| "description", |
| ], |
| )( |
| list_id=msg.get("list-id"), |
| archive_public=is_public, |
| archive_policy=None, |
| list_name=msg.get("list-id"), |
| description=msg.get("list-id"), |
| ) |
| |
| try: |
| lid, mid = archie.archive_message(list_data, msg, raw_message, args.dry, args.dump, args.defaultepoch, args.digest) |
| if args.digest: |
| print(mid) |
| else: |
| print( |
| "%s: Done archiving to %s as %s!" |
| % (email.utils.formatdate(), lid, mid) |
| ) |
| except Exception as err: |
| if args.verbose: |
| traceback.print_exc() |
| print("Archiving failed!: %s" % err) |
| raise Exception("Archiving to ES failed") from err |
| else: |
| print("Nothing to import (no list-id found!)") |
| except Exception as err: |
| # extract the len number without using variables (which may cause issues?) |
| # last traceback 1st entry, 2nd field |
| line = traceback.extract_tb(sys.exc_info()[2])[0][1] |
| if args.quiet: |
| print( |
| "Could not parse email, but exiting quietly as --quiet is on: %s (@ %s)" |
| % (err, line) |
| ) |
| else: |
| print("Could not parse email: %s (@ %s)" % (err, line)) |
| sys.exit(-1) |
| |
| |
| if __name__ == "__main__": |
| main() |