| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import asyncio |
| import datetime |
| import re |
| import sys |
| import time |
| |
| from elasticsearch_dsl import Search |
| from elasticsearch import VERSION as ES_VERSION |
| |
| import plugins.configuration |
| import plugins.server |
| import plugins.database |
| |
| PYPONY_RE_PREFIX = re.compile(r"^([a-zA-Z]+:\s*)+") |
| ACTIVITY_TIMESPAN = "now-90d" # How far back to look for "current" activity in lists |
| |
| |
| class ProgTimer: |
| start: float |
| title: str |
| |
| def __init__(self, title): |
| self.title = title |
| |
| async def __aenter__(self): |
| sys.stdout.write( |
| "[%s] %s..." % (datetime.datetime.now().strftime("%H:%M:%S"), self.title) |
| ) |
| self.start = time.time() |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| print("Done in %.2f seconds" % (time.time() - self.start)) |
| |
| |
| async def get_lists(database: plugins.configuration.DBConfig) -> dict: |
| """ |
| |
| :param database: a Pony Mail database configuration |
| :return: A dictionary of all mailing lists found, and whether they are considered |
| public or private |
| """ |
| lists = {} |
| db = plugins.database.Database(database) |
| limit = database.max_lists |
| |
| # Fetch aggregations of all private emails |
| # Do this first, so mixed lists are not marked private |
| s = Search(using=db.client, index=db.dbs.db_mbox).filter( |
| "term", private=True |
| ) |
| s.aggs.bucket("per_list", "terms", field="list_raw", size=limit) |
| |
| res = await db.search( |
| index=db.dbs.db_mbox, body=s.to_dict(), size=0 |
| ) |
| |
| for ml in res["aggregations"]["per_list"]["buckets"]: |
| list_name = ml["key"].strip("<>").replace(".", "@", 1) |
| lists[list_name] = { |
| "count": 0, # Sorting later |
| "private": True, |
| } |
| |
| # Fetch aggregations of all public emails |
| s = Search(using=db.client, index=db.dbs.db_mbox).filter( |
| "term", private=False |
| ) |
| s.aggs.bucket("per_list", "terms", field="list_raw", size=limit) |
| |
| res = await db.search( |
| index=db.dbs.db_mbox, body=s.to_dict(), size=0 |
| ) |
| |
| for ml in res["aggregations"]["per_list"]["buckets"]: |
| list_name = ml["key"].strip("<>").replace(".", "@", 1) |
| lists[list_name] = { |
| "count": 0, # We'll sort this later |
| "private": False, |
| } |
| |
| # Get 90 day activity, if any |
| s = Search(using=db.client, index=db.dbs.db_mbox) |
| s = s.filter('range', date = {'gte': ACTIVITY_TIMESPAN}) |
| s.aggs.bucket("per_list", "terms", field="list_raw", size=limit) |
| |
| res = await db.search( |
| index=db.dbs.db_mbox, body=s.to_dict(), size=0 |
| ) |
| |
| for ml in res["aggregations"]["per_list"]["buckets"]: |
| list_name = ml["key"].strip("<>").replace(".", "@", 1) |
| if list_name in lists: |
| lists[list_name]["count"] = ml["doc_count"] |
| |
| await db.client.close() |
| |
| return lists |
| |
| |
| async def get_public_activity(database: plugins.configuration.DBConfig) -> dict: |
| """ |
| |
| :param database: a PyPony database configuration |
| :return: A dictionary with activity stats |
| """ |
| db = plugins.database.Database(database) |
| |
| # Fetch aggregations of all public emails |
| s = ( |
| Search(using=db, index=db.dbs.db_mbox) |
| .query("match", private=False) |
| .filter("range", date={"lt": "now+1d", "gt": "now-14d"}) |
| ) |
| |
| s.aggs.bucket("number_of_lists", "cardinality", field="list_raw") |
| s.aggs.bucket("number_of_senders", "cardinality", field="from_raw") |
| s.aggs.bucket( |
| "daily_emails", "date_histogram", field="date", calendar_interval="1d" |
| ) |
| |
| res = await db.search( |
| index=db.dbs.db_mbox, body=s.to_dict(), size=0 |
| ) |
| |
| no_emails = res["hits"]["total"]["value"] |
| no_lists = res["aggregations"]["number_of_lists"]["value"] |
| no_senders = res["aggregations"]["number_of_senders"]["value"] |
| daily_emails = [] |
| for entry in res["aggregations"]["daily_emails"]["buckets"]: |
| daily_emails.append((entry["key"], entry["doc_count"])) |
| |
| # Now the nitty gritty thread count |
| seen_emails = {} |
| seen_topics = [] |
| thread_count = 0 |
| |
| s = ( |
| Search(using=db.client, index=db.dbs.db_mbox) |
| .query("match", private=False) |
| .filter("range", date={"lt": "now+1d", "gt": "now-14d"}) |
| ) |
| async for docs in db.scan( |
| index=db.dbs.db_mbox, |
| query=s.to_dict(), |
| _source_includes=[ |
| "message-id", |
| "in-reply-to", |
| "subject", |
| "references", |
| "epoch", |
| "list_raw", |
| ], |
| ): |
| |
| for doc in docs: |
| found = False |
| |
| message_id = doc["_source"].get("message-id") |
| irt = doc["_source"].get("in-reply-to") |
| references = doc["_source"].get("references") |
| list_raw = doc["_source"].get("list_raw", "_") |
| subject = doc["_source"].get("subject", "_") |
| if irt and irt in seen_emails: |
| seen_emails[message_id] = irt |
| found = True |
| elif references: |
| for refid in re.split(r"\s+", references): |
| if refid in seen_emails: |
| seen_emails[message_id] = refid |
| found = True |
| if not found: |
| subject = PYPONY_RE_PREFIX.sub("", subject) |
| subject += list_raw |
| if subject in seen_topics: |
| seen_emails[message_id] = subject |
| else: |
| seen_topics.append(subject) |
| thread_count += 1 |
| |
| await db.client.close() |
| |
| activity = { |
| "hits": no_emails, |
| "no_threads": thread_count, |
| "no_active_lists": no_lists, |
| "participants": no_senders, |
| "activity": daily_emails, |
| } |
| |
| return activity |
| |
| async def get_data(server: plugins.server.BaseServer): |
| """ |
| Fetches the data once. |
| This is a separate function so it can be invoked on demand. |
| """ |
| async with ProgTimer("Gathering list of archived mailing lists"): |
| try: |
| server.data.lists = await get_lists(server.config.database) |
| print(f"Found {len(server.data.lists)} lists") |
| except plugins.database.DBError as e: |
| print("Could not fetch lists - database down or not connected: %s" % e) |
| async with ProgTimer("Gathering bi-weekly activity stats"): |
| try: |
| server.data.activity = await get_public_activity(server.config.database) |
| except plugins.database.DBError as e: |
| print( |
| "Could not fetch activity data - database down or not connected: %s" |
| % e |
| ) |
| |
| async def run_tasks(server: plugins.server.BaseServer) -> None: |
| """ |
| Runs long-lived background data gathering tasks such as gathering statistics about email activity and the list |
| of archived mailing lists, for populating the pony mail main index. |
| |
| Generally runs every 2½ minutes, or whatever is set in tasks/refresh_rate in ponymail.yaml |
| """ |
| |
| # Initial setup |
| server.library_version = ".".join([str(v) for v in ES_VERSION]) |
| db = plugins.database.Database(server.config.database) |
| server.engine_version = (await db.info())['version']['number'] |
| await db.client.close() |
| |
| while True: |
| await get_data(server) |
| try: |
| await asyncio.wait_for(server.background_event.wait(), timeout=server.config.tasks.refresh_rate) |
| break # if the event is set, then we have been asked to stop |
| except asyncio.TimeoutError: |
| pass # This is normal |