| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| This is the mbox library for Pony Mail. |
| It handles fetching (the right) emails for |
| pages that need it. |
| """ |
| |
| |
| import base64 |
| import binascii |
| import datetime |
| import email.utils |
| import hashlib |
| |
| # Main imports |
| import re |
| import typing |
| |
| |
| import plugins.aaa |
| import plugins.session |
| import plugins.database |
| |
| PYPONY_RE_PREFIX = re.compile(r"^([a-zA-Z]+:\s*)+") # Prefixes on subjects, such as Re: Fwd:, etc. |
| DATABASE_NOT_CONNECTED = "Database not connected!" |
| OLD_SHORTENED_ID_LENGTH = 18 # Thread IDs of 18 char length (deprecated) need special care in searches |
| NEEDS_QUOTES = re.compile(r'[][\\()<>@,:;".]') # If these characters are present in an email display name, quote it |
| ESCAPES_RE = re.compile(r'[\\"]') # Characters to escape with backslash in make_address() |
| |
| mbox_cache_privacy: typing.Dict[str, bool] = {} |
| |
| # This is used to detect if the '...' truncation marker is to be added |
| SHORT_BODY_MAX_LEN = 200 # This must be the same as Archiver.SHORT_BODY_MAXLEN |
| |
| # Only these fields are returned by the API: |
| # (keep this list sorted) |
| USED_UI_FIELDS = [ |
| "attachments", |
| "body", |
| "children", |
| "epoch", |
| "from", |
| "gravatar", |
| "id", |
| "in-reply-to", |
| "list_raw", |
| "list", |
| "message-id", |
| "mid", |
| "private", |
| "subject", |
| ] |
| # The following fields are currently excluded: |
| # cc, date, dbid, forum, html_source_only, from_raw, permalinks, references, size, to |
| # The body_short contents may replace the body contents, but it is not returned separately |
| |
| # must always fetch private and deleted |
| # list_raw is needed for authentication |
| MUST_HAVE = [ 'private', 'deleted', 'list_raw'] |
| |
| |
| def trim_email(doc: dict, external: bool = False) -> None: |
| """Trims away document fields not used by the UI""" |
| for header in list(doc.keys()): |
| # Remove meta data fields which start with an underscore |
| if header.startswith("_"): |
| del doc[header] |
| |
| # Remove other fields not used by the UI, if for external consumption |
| elif external and header not in USED_UI_FIELDS: |
| del doc[header] |
| |
| |
| # Format an email address given a name (optional) and an email address. |
| # Same as email.utils.formataddr except no Unicode escaping happens. |
| def make_address(name: str, email: str) -> str: |
| if name and email: |
| quotes = '' |
| if NEEDS_QUOTES.search(name): |
| quotes = '"' |
| name = ESCAPES_RE.sub(r'\\\g<0>', name) |
| return f'{quotes}{name}{quotes} <{email}>' |
| elif email: |
| return email |
| else: |
| return "" |
| |
| |
| # anonymise a string of email entries |
| def anonymize_mail_address(emailstring: str) -> str: |
| out = [] |
| if not emailstring: |
| return "" |
| # split the email list into individual entries |
| for real, addr in email.utils.getaddresses([emailstring]): |
| # generate the anonymised entries |
| anon = re.sub(r"(\S{1,2})\S*@([-a-zA-Z0-9_.]+)", "\\1...@\\2", addr) |
| out.append(make_address(real, anon)) |
| |
| # rejoin one per line |
| return ",\n ".join(out) |
| |
| def anonymize(doc: dict) -> dict: |
| """ Anonymizes an email, hiding author email addresses.""" |
| # ES direct hit? |
| ptr: typing.Dict[str, str] = doc |
| if "_source" in doc: |
| ptr = doc["_source"] |
| |
| if "from" in ptr: |
| ptr["from"] = anonymize_mail_address(ptr["from"]) |
| if "to" in ptr: |
| ptr["to"] = anonymize_mail_address(ptr["to"]) |
| if "cc" in ptr: |
| ptr["cc"] = anonymize_mail_address(ptr["cc"]) |
| if "body" in ptr and ptr["body"]: |
| ptr["body"] = re.sub( |
| r"<(\S{1,2})\S*@([-a-zA-Z0-9_.]+)>", "<\\1...@\\2>", ptr["body"] |
| ) |
| return doc |
| |
| |
| async def find_parent(session, doc: typing.Dict[str, str]): |
| """ |
| Locates the first email in a thread by going back through all the |
| in-reply-to headers and finding their ancestor. |
| """ |
| step = 0 |
| # max 50 steps up in the hierarchy |
| while step < 50: |
| step = step + 1 |
| irt: typing.Optional[str] = doc["in-reply-to"] if "in-reply-to" in doc else None |
| if not irt: |
| break # Shouldn't happen because irt is always present currently |
| # Extract the reference, if any |
| m = re.search(r"(<[^>]+>)", irt) |
| if not m: |
| break |
| ref = m.group(1) |
| newdoc = await get_email(session, messageid=ref) |
| # Did we find something accessible? |
| if not newdoc: |
| break |
| doc = newdoc |
| return doc |
| |
| |
| async def fetch_children(session: plugins.session.SessionObject, |
| pdoc: dict, |
| counter: int = 0, |
| pdocs: dict = None, |
| short: bool = False) -> typing.Tuple[list,list,dict]: |
| """ |
| Fetches all accessible child messages of a parent email |
| """ |
| if pdocs is None: |
| pdocs = {} |
| counter = counter + 1 |
| if counter > 250: |
| return [], [], {} |
| docs: typing.List[dict] = await get_email_irt(session, pdoc["message-id"]) |
| |
| thread = [] |
| emails = [] |
| for doc in docs: |
| if doc["mid"] not in pdocs: |
| mykids, _myemails, pdocs = await fetch_children( |
| session, doc, counter, pdocs, short=short |
| ) |
| if short: |
| xdoc = { |
| "tid": doc["mid"], |
| "mid": doc["mid"], |
| "message-id": doc["message-id"], |
| "subject": doc["subject"], |
| "from": doc["from"], |
| "id": doc["mid"], |
| "epoch": doc["epoch"], |
| "children": mykids, |
| "irt": doc["in-reply-to"], |
| "list_raw": doc["list_raw"], |
| } |
| thread.append(xdoc) |
| pdocs[doc["mid"]] = xdoc |
| else: |
| thread.append(doc) |
| pdocs[doc["mid"]] = doc |
| for kid in mykids: |
| if kid["mid"] not in pdocs: |
| pdocs[kid["mid"]] = kid |
| emails.append(kid) |
| return thread, emails, pdocs |
| |
| |
| async def get_email( |
| session: plugins.session.SessionObject, |
| permalink: str = None, |
| messageid: str = None, |
| listid: str = None, |
| ) -> typing.Optional[dict]: |
| """ |
| Returns a single matching mbox document or None |
| """ |
| assert session.database, DATABASE_NOT_CONNECTED |
| doctype = session.database.dbs.db_mbox |
| doc = None |
| if permalink: |
| try: |
| doc = await session.database.get(index=doctype, id=permalink) |
| # Email not found through primary ID, look for other permalinks? |
| except plugins.database.DBError: |
| # If using old shortened hex IDs, regexp for them instead of a direct match |
| if len(permalink) == OLD_SHORTENED_ID_LENGTH and re.match(r"^[a-f0-9]+$", permalink): |
| permalink += ".+" |
| aggtype = "regexp" |
| else: |
| aggtype = "term" |
| res = await session.database.search( |
| index=doctype, |
| size=1, |
| body={ |
| "query": {"bool": {"must": [{aggtype: {"permalinks": permalink}}]}} |
| }, |
| ) |
| if len(res["hits"]["hits"]) == 1: |
| doc = res["hits"]["hits"][0] |
| elif messageid: |
| bquery = {"query": {"bool": {"must": [{"term": {"message-id": messageid}}]}}} |
| if listid: # Specific List ID required? |
| bquery = {"query": {"bool": {"must": [{"term": {"message-id": messageid}}, {"term": {"list_raw": listid}}]}}} |
| res = await session.database.search( |
| index=doctype, |
| size=1, |
| body=bquery, |
| ) |
| if len(res["hits"]["hits"]) == 1: |
| doc = res["hits"]["hits"][0] |
| |
| # Did we find a single doc? |
| if doc and isinstance(doc, dict): |
| doc = doc["_source"] |
| doc["id"] = doc["mid"] |
| # If deleted by UI, only return if session is admin |
| is_admin = session.credentials and session.credentials.admin |
| if doc.get("deleted", False) and not is_admin: |
| return None |
| if doc and plugins.aaa.can_access_email(session, doc): |
| trim_email(doc) |
| if not session.credentials: |
| doc = anonymize(doc) |
| return doc |
| |
| # no doc? |
| return None |
| |
| async def get_email_irt( |
| session: plugins.session.SessionObject, |
| irt: str, |
| ) -> typing.List[dict]: |
| """ |
| Returns a list of mbox document(s) that are related, |
| i.e. where the parameter matches 'in-reply-to' or 'references' |
| May be empty. |
| Docs have been checked for accessibility. |
| """ |
| assert session.database, DATABASE_NOT_CONNECTED |
| doctype = session.database.dbs.db_mbox |
| |
| xirt = '"%s"' % irt.replace('"', '\\"') |
| res = await session.database.search( |
| index=doctype, |
| size=250, |
| body={"query": {"bool": {"must": [ {"simple_query_string": { "query": xirt, "fields":["in-reply-to", "references"]}}]}}}, |
| ) |
| docs = res["hits"]["hits"] |
| |
| is_admin = session.credentials and session.credentials.admin |
| docs_returned = [] |
| for doc in docs: |
| doc = doc["_source"] |
| doc["id"] = doc["mid"] |
| if doc.get("deleted", False) and not is_admin: |
| continue |
| if doc and plugins.aaa.can_access_email(session, doc): |
| trim_email(doc) |
| if not session.credentials: |
| doc = anonymize(doc) |
| docs_returned.append(doc) |
| return docs_returned |
| |
| |
| async def get_source(session: plugins.session.SessionObject, permalink: str, raw=False): |
| """ |
| Get the source document for an email, or None |
| """ |
| assert session.database, DATABASE_NOT_CONNECTED |
| try: |
| doc = await session.database.get(index=session.database.dbs.db_source, id=permalink) |
| except plugins.database.DBError: |
| doc = None |
| if doc: |
| # If hidden, only return source if session is admin |
| is_admin = session.credentials and session.credentials.admin |
| if doc["_source"].get("deleted", False) and not is_admin: |
| return None |
| if raw: |
| return doc |
| # Check for base64-encoded source |
| if ":" not in doc["_source"]["source"]: |
| try: |
| doc["_source"]["source"] = base64.standard_b64decode( |
| doc["_source"]["source"] |
| ).decode("utf-8", "replace") |
| except binascii.Error: |
| pass # If it wasn't base64 after all, just return as is |
| return doc |
| return None |
| |
| |
| async def query_batch( |
| session: plugins.session.SessionObject, |
| query_defuzzed: dict, |
| metadata_only: bool = False, |
| epoch_order: str = "desc", |
| source_fields: typing.Optional[typing.List[str]] = None |
| ): |
| """ |
| Advanced query and grab for stats.py |
| Also called by mbox.py (using metadata_only=True) |
| Yields batches of scan results, filtered to remove inaccessible mails |
| """ |
| assert session.database, DATABASE_NOT_CONNECTED |
| preserve_order = True if epoch_order == "asc" else False |
| es_query = { |
| "query": {"bool": query_defuzzed}, |
| "sort": [{"epoch": {"order": epoch_order}}], |
| } |
| if metadata_only: # Only doc IDs and AAA fields. |
| es_query["_source"] = ["deleted", "private", "mid", "dbid", "list_raw"] |
| elif not source_fields is None: |
| temp = source_fields.copy() |
| for hdr in MUST_HAVE: |
| if not hdr in source_fields: |
| temp.append(hdr) |
| es_query["_source"] = temp |
| else: |
| es_query["_source"] = { "excludes": ["body"] } |
| async for hits in session.database.scan( |
| query=es_query, |
| preserve_order=preserve_order |
| ): |
| is_admin = session.credentials and session.credentials.admin |
| docs = [] |
| for hit in hits: |
| doc = hit["_source"] |
| # If email was delete/hidden and we're not doing an admin query, ignore it |
| if doc.get("deleted", False) and not is_admin: |
| continue |
| if plugins.aaa.can_access_email(session, doc): |
| if "mid" in doc: # might be missing when using source_fields |
| doc["id"] = doc["mid"] |
| # Calculate gravatars if not present in _source |
| if not metadata_only and source_fields is None and "gravatar" not in doc: |
| doc["gravatar"] = gravatar(doc) |
| if not session.credentials: |
| doc = anonymize(doc) |
| if "body_short" in doc: |
| # The body_short field is set to SHORT_BODY_MAX_LEN+1 if the body is longer |
| # than SHORT_BODY_MAX_LEN, so we know if it has been truncated |
| if len(doc["body_short"] or "") > SHORT_BODY_MAX_LEN: |
| doc["body"] = doc["body_short"][:SHORT_BODY_MAX_LEN] + '...' |
| else: |
| doc["body"] = doc["body_short"] |
| # stats.py is expecting doc['body'], not body_short |
| del doc["body_short"] |
| trim_email(doc) |
| # drop any added fields |
| if not source_fields is None: |
| for hdr in MUST_HAVE: |
| if not hdr in source_fields and hdr in doc: |
| del doc[hdr] |
| docs.append(doc) |
| if len(docs) > 0: |
| yield docs |
| |
| |
| async def query( |
| session: plugins.session.SessionObject, |
| query_defuzzed: dict, |
| query_limit: int, |
| metadata_only: bool = False, |
| epoch_order: str = "desc", |
| source_fields: typing.Optional[typing.List[str]] = None |
| ) -> typing.List[dict]: |
| """ |
| Advanced query and grab for stats.py |
| Also called by mbox.py (using metadata_only=True) |
| """ |
| docs = [] |
| hits = 0 |
| async for batch in query_batch( |
| session, |
| query_defuzzed, |
| metadata_only=metadata_only, |
| epoch_order=epoch_order, |
| source_fields=source_fields |
| ): |
| for doc in batch: |
| docs.append(doc) |
| hits += 1 |
| if hits > query_limit: |
| break |
| if hits > query_limit: |
| break |
| return docs |
| |
| |
| async def wordcloud(session: plugins.session.SessionObject, query_defuzzed: dict) -> dict: |
| """ |
| Wordclouds via significant terms query in ES |
| The query must include a private mail filter if necessary |
| """ |
| wc = {} |
| try: |
| |
| assert session.database, DATABASE_NOT_CONNECTED |
| res = await session.database.search( |
| body={ |
| "size": 0, |
| "query": {"bool": query_defuzzed}, |
| "aggregations": { |
| "cloud": {"significant_terms": {"field": "subject", "size": 10}} |
| }, |
| } |
| ) |
| |
| for hit in res["aggregations"]["cloud"]["buckets"]: |
| wc[hit["key"]] = hit["doc_count"] |
| |
| except plugins.database.Timeout: # If we time out, just return empty WC. |
| pass |
| return wc |
| |
| async def get_accessible_filter(session: plugins.session.SessionObject, query_defuzzed: dict) -> typing.Optional[list]: |
| """ |
| Return a filter to be applied to the query to exclude inaccessible mails. |
| If no filter is needed, return None |
| e.g. |
| query_filter = get_accessible_filter(session, query) |
| if query_filter: |
| query['filter'] = query_filter |
| """ |
| if not session.credentials: |
| # if no credentials, only need to search public mails |
| return [{"term": {"private": False}}] |
| # which private lists might be involved in the search? |
| fuzz_private_only = dict(query_defuzzed) |
| fuzz_private_only["filter"] = [{"term": {"private": True}}] |
| assert session.database, DATABASE_NOT_CONNECTED |
| max_lists = session.database.config.max_lists |
| res = await session.database.search( |
| index=session.database.dbs.db_mbox, |
| size=0, |
| body={ |
| "query": {"bool": fuzz_private_only}, |
| "aggs": {"listnames": {"terms": {"field": "list_raw", "size": max_lists}}}, |
| }, |
| ) |
| private_lists_found = [] |
| for entry in res["aggregations"]["listnames"]["buckets"]: |
| listname = entry["key"].lower() |
| private_lists_found.append(listname) |
| |
| # If we can access all private lists found, or if no private lists, we can do a complete search. |
| # If not, we adjust the search here to only include public emails OR private lists we can access. |
| private_lists_accessible = [] |
| for listname in private_lists_found: |
| if plugins.aaa.can_access_list(session, listname): |
| private_lists_accessible.append(listname) |
| |
| # If we can't access all private lists found, either only public emails or lists we can access. |
| if not private_lists_accessible: # No private lists accessible, just filter for public |
| return [{"term": {"private": False}}] |
| elif private_lists_found != private_lists_accessible: # Some private lists, search for public OR those.. |
| return [ |
| {"bool": {"should": [{"term": {"private": False}}, {"terms": {"list_raw": private_lists_accessible}}]}} |
| ] |
| |
| return None |
| |
| |
| async def get_activity_span(session: plugins.session.SessionObject, query_defuzzed: dict) -> typing.Tuple[datetime.datetime, datetime.datetime, dict]: |
| """ |
| Fetches the activity span of a search as well as active months within that span |
| The query must include a private filter if necessary |
| """ |
| |
| # Get oldest and youngest doc in single scan, as well as a monthly histogram |
| assert session.database, DATABASE_NOT_CONNECTED |
| res = await session.database.search( |
| index=session.database.dbs.db_mbox, |
| size=0, |
| body={"query": {"bool": query_defuzzed}, |
| "aggs": { |
| "first": {"min": {"field": "epoch"}}, |
| "last": {"max": {"field": "epoch"}}, |
| "active_months": { |
| "date_histogram": { |
| "field": "date", |
| "calendar_interval": "month", |
| "format": "yyyy-MM" |
| } |
| } |
| }, |
| } |
| ) |
| |
| oldest = datetime.datetime.fromtimestamp(0) |
| youngest = datetime.datetime.fromtimestamp(0) |
| monthly_activity = {} |
| if res["aggregations"]: |
| aggs = res["aggregations"] |
| oldest = datetime.datetime.fromtimestamp(aggs["first"]["value"] or 0) |
| youngest = datetime.datetime.fromtimestamp(aggs["last"]["value"] or 0) |
| for bucket in aggs["active_months"].get("buckets", []): |
| if bucket["doc_count"]: |
| monthly_activity[bucket["key_as_string"]] = bucket["doc_count"] |
| |
| return oldest, youngest, monthly_activity |
| |
| |
| class ThreadConstructor: |
| def __init__(self, emails: typing.List[typing.Dict]): |
| self.emails = emails |
| self.threads: typing.List[dict] = [] |
| # this now includes the gravatar, to avoid issues with address anonymisation |
| self.authors: typing.Dict[str, list] = {} |
| self.hashed_by_msg_id: typing.Dict[str, dict] = {} |
| self.hashed_by_subject: typing.Dict[str, dict] = {} |
| |
| def construct(self): |
| """Turns a flat array of emails into a nested structure of threads""" |
| for cur_email in sorted(self.emails, key=lambda x: x["epoch"]): |
| author = cur_email.get("from") |
| if author not in self.authors: |
| self.authors[author] = [0, cur_email.get("gravatar", "")] |
| self.authors[author][0] += 1 |
| subject = cur_email.get("subject", "").replace( |
| "\n", "" |
| ) # Crop multi-line subjects |
| tsubject = ( |
| PYPONY_RE_PREFIX.sub("", subject) |
| + "_" |
| + cur_email.get("list_raw", "<a.b.c.d>") |
| ) |
| parent = self.find_root_subject(cur_email, tsubject) |
| xemail = { |
| "children": [], |
| "tid": cur_email.get("mid"), |
| "subject": subject, |
| "tsubject": tsubject, |
| "epoch": cur_email.get("epoch"), |
| "nest": 1, |
| } |
| if not parent: |
| self.threads.append(xemail) |
| else: |
| xemail["nest"] = parent["nest"] + 1 |
| parent["children"].append(xemail) |
| self.hashed_by_msg_id[cur_email.get("message-id", "??")] = xemail |
| if tsubject not in self.hashed_by_subject: |
| self.hashed_by_subject[tsubject] = xemail |
| return self.threads, self.authors |
| |
| |
| def find_root_subject(self, root_email: typing.Dict[str, str], osubject: str = None) -> typing.Optional[dict]: |
| """Finds the discussion origin of an email, if present""" |
| irt = root_email.get("in-reply-to",'') |
| subject = root_email.get("subject",'') |
| subject = subject.replace("\n", "").strip() # Crop multi-line subjects and surrounding whitespace |
| subject = PYPONY_RE_PREFIX.sub("", subject) + "_" + root_email.get("list_raw",'') |
| |
| # First, the obvious - look for an in-reply-to in our existing dict with a matching subject |
| if irt and irt in self.hashed_by_msg_id: |
| if self.hashed_by_msg_id[irt].get("subject") == subject: |
| return self.hashed_by_msg_id[irt] |
| |
| # If that failed, we break apart our subject |
| if osubject: |
| rsubject = osubject |
| else: |
| rsubject = subject |
| if rsubject and rsubject in self.hashed_by_subject: |
| return self.hashed_by_subject[rsubject] |
| return None |
| |
| |
| |
| def gravatar(eml: typing.Union[str, dict]) -> str: |
| """Generates a gravatar hash from an email address""" |
| if isinstance(eml, str): |
| header_from = eml |
| else: |
| header_from = eml.get("from", "??") |
| mailaddr = email.utils.parseaddr(header_from)[1] |
| ghash = hashlib.md5(mailaddr.encode("utf-8")).hexdigest() |
| return ghash |