blob: 9193d00a3dd4a4dc7ea3afb1eea0abeb6c845c6b [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
from elasticsearch import AsyncElasticsearch, Elasticsearch, helpers
from elasticsearch.helpers import async_scan
if not __package__:
from plugins import generators, textlib
else:
from .plugins import generators, textlib
import argparse
import base64
import email.utils
import hashlib
import multiprocessing
import time
import sys
import typing
import archiver
# Increment this number whenever breaking changes happen in the migration workflow:
MIGRATION_MAGIC_NUMBER = "2"
# Max number of parallel conversions to perform before pushing. 75-ish percent of max cores.
cores = multiprocessing.cpu_count()
MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1)
class MultiDocProcessor:
"""MultiProcess document processor"""
def __init__(self, old_es_url: str, new_es_url: str, target: typing.Callable, num_processes: int = 8, graceful: bool = False):
self.processes = []
self.queues = []
self.target = target
self.graceful = graceful
self.manager = multiprocessing.Manager()
self.lock = self.manager.Lock()
self.processed = self.manager.Value("i", 0)
self.processed_last_count = 0
self.start_time = time.time()
self.queue_pointer = 0
self.num_processes = num_processes
for _ in range(0, num_processes):
q: multiprocessing.Queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=self.start,
args=(
q,
old_es_url,
new_es_url,
),
)
self.queues.append(q)
self.processes.append(p)
p.start()
def feed(self, *params):
"""Feed arguments to the next available processor"""
self.queues[self.queue_pointer].put(params)
self.queue_pointer += 1
self.queue_pointer %= self.num_processes
def sighup(self):
for queue in self.queues:
queue.put("SIGHUP")
def stop(self):
for queue in self.queues:
queue.put("DONE")
for proc in self.processes:
proc.join()
def status(self, total):
processed = self.processed.value
if processed - self.processed_last_count >= 1000:
self.processed_last_count = processed
now = time.time()
time_spent = now - self.start_time
docs_per_second = (processed / time_spent) or 1
time_left = (total - processed) / docs_per_second
# stringify time left
time_left_str = "%u seconds" % time_left
if time_left > 60:
time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
if time_left > 3600:
time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
int(time_left / 3600),
int(time_left % 3600 / 60),
time_left % 60,
)
print(
"Processed %u documents, %u remain. ETA: %s (at %u documents per second)"
% (processed, (total - processed), time_left_str, docs_per_second)
)
def start(self, queue, old_es_url, new_es_url):
old_es = Elasticsearch([old_es_url])
new_es = Elasticsearch([new_es_url])
bulk_array = []
while True:
params = queue.get()
if params == "SIGHUP": # Push stragglers
if bulk_array:
bulk_push(bulk_array, new_es, self.graceful)
bulk_array[:] = []
elif params == "DONE": # Close up shop completely
if bulk_array:
bulk_push(bulk_array, new_es, self.graceful)
old_es.close()
new_es.close()
return
else:
as_list = list(params)
as_list.insert(0, old_es)
try:
ret_val = self.target(*as_list)
except:
if self.graceful:
print("Unexpected error:", sys.exc_info()[0])
else:
print("Unexpected error:", sys.exc_info()[0])
raise
if ret_val:
bulk_array.extend(ret_val)
with self.lock:
self.processed.value += 1
if len(bulk_array) >= 200:
bulk_push(bulk_array, new_es, self.graceful)
bulk_array[:] = []
def bulk_push(json, es, graceful=False):
"""Pushes a bunch of objects to ES in a bulk operation"""
js_arr = []
for entry in json:
bulk_op = {
"_op_type": "index",
"_index": entry["index"],
"_id": entry["id"],
"_source": entry["body"],
}
js_arr.append(bulk_op)
try:
helpers.bulk(es, js_arr)
except helpers.errors.BulkIndexError as e:
if graceful:
print("Bulk index error: %s" % e)
else:
raise
def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim):
now = time.time()
list_id = textlib.normalize_lid(doc["_source"]["list_raw"])
try:
source = old_es.get(index=old_dbname, doc_type="mbox_source", id=doc["_id"])
# If we hit a 404 on a source, we have to fake an empty document, as we don't know the source.
except:
print("Source for %s was not found, faking it..." % doc["_id"])
source = {"_source": {"source": ""}}
source_text: str = source["_source"]["source"]
if ":" not in source_text: # Base64
source_text = base64.b64decode(source_text)
else: # bytify
source_text = source_text.encode("utf-8", "ignore")
archive_as_id = doc["_id"]
if do_dkim:
dkim_id = generators.dkimid(None, None, list_id, None, source_text)
old_id = doc["_id"]
archive_as_id = dkim_id
doc["_source"]["mid"] = dkim_id
doc["_source"]["permalinks"] = [dkim_id, old_id]
else:
doc["_source"]["permalinks"] = [doc["_id"]]
doc["_source"]["dbid"] = hashlib.sha3_256(source_text).hexdigest()
# Add in shortened body for search aggs
# We add +1 to know whether to use ellipsis in reports.
doc["_source"]["body_short"] = doc["_source"]["body"][:archiver.SHORT_BODY_MAX_LEN+1]
# Add in gravatar
header_from = doc["_source"]["from"]
mailaddr = email.utils.parseaddr(header_from)[1]
ghash = hashlib.md5(mailaddr.encode("utf-8")).hexdigest()
doc["_source"]["gravatar"] = ghash
# Append migration details to notes field in doc
notes = doc["_source"].get("_notes", [])
# We want a list, not a single string
if isinstance(notes, str):
notes = list(notes)
notes.append(
"MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, "
"using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER)
)
# If we re-indexed the document, make a note of that as well.
if do_dkim:
notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, " "from %s to %s" % (now, dkim_id, old_id))
doc["_source"]["_notes"] = notes
# Copy to new DB
return (
{"index": dbname_mbox, "id": archive_as_id, "body": doc["_source"]},
{"index": dbname_source, "id": doc["_source"]["dbid"], "body": source["_source"]},
)
def process_attachment(old_es, doc, dbname_attachment):
return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},)
async def main(args):
no_jobs = args.jobs
graceful = args.graceful
print("Welcome to the Apache Pony Mail -> Foal migrator.")
print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
print("We will be utilizing %u cores for this operation." % no_jobs)
print("------------------------------------")
old_es_url = args.old_url or input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
new_es_url = args.new_url or input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
if old_es_url == new_es_url:
print("Old and new DB should not be the same, assuming error in input and exiting!")
return
ols_es_async = AsyncElasticsearch([old_es_url])
old_dbname = args.old_name or input("What is the database name for the old Pony Mail emails? [ponymail]: ") or "ponymail"
new_dbprefix = args.new_prefix or input("What is the database prefix for the new Pony Mail emails? [ponymail]: ") or "ponymail"
do_dkim = True
dkim_txt = (
input(
"Do you wish to perform DKIM re-indexing of all emails? This will NOT preserve all old permalinks currently "
"(y/n) [y]: "
)
or "y"
)
if dkim_txt.lower() == "n":
do_dkim = False
# Define index names for new ES
dbname_mbox = new_dbprefix + "-mbox"
dbname_source = new_dbprefix + "-source"
dbname_attachment = new_dbprefix + "-attachment"
# Let's get started..!
# start_time = time.time()
count = await ols_es_async.count(index=old_dbname, doc_type="mbox")
no_emails = count["count"]
print("------------------------------------")
print("Starting migration of %u emails, this may take quite a while..." % no_emails)
processes = MultiDocProcessor(old_es_url, new_es_url, process_document, no_jobs)
docs_read = 0
async for doc in async_scan(
client=ols_es_async,
query={"query": {"match_all": {}}},
doc_type="mbox",
index=old_dbname,
):
docs_read += 1
processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim)
# Don't speed too far ahead of processing...
processed = processes.processed.value
while docs_read - processed > 100 * no_jobs:
await asyncio.sleep(0.01)
processed = processes.processed.value + 0
processes.status(no_emails)
# There may be some docs left over to push
processes.sighup()
while processed < no_emails: # Wait for all documents to have been processed.
await asyncio.sleep(1)
print(f"Waiting for bulk push to complete ({processed} out of {no_emails} done...)")
processed = processes.processed.value
processes.stop()
# Process attachments
# start_time = time.time()
processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs, graceful)
docs_read = 0
count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
no_att = count["count"]
print("Transferring %u attachments..." % no_att)
async for doc in async_scan(
client=ols_es_async,
query={"query": {"match_all": {}}},
doc_type="attachment",
index=old_dbname,
):
processes.feed(doc, dbname_attachment)
docs_read += 1
# Don't speed ahead
processed = processes.processed.value + 0
while docs_read - processed > 10 * no_jobs:
await asyncio.sleep(0.01)
processed = processes.processed.value + 0
processes.status(no_att)
# There may be some docs left over to push
processes.sighup()
while processed < no_att: # Wait for all attachments to have been processed.
await asyncio.sleep(1)
print(f"Waiting for bulk push to complete ({processed} out of {no_att} done...)")
processed = processes.processed.value
processes.stop()
await ols_es_async.close()
print("All done, enjoy!")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--jobs",
"-j",
help="Number of concurrent processing jobs to run. Default is %u." % MAX_PARALLEL_OPS,
type=int,
default=MAX_PARALLEL_OPS,
)
parser.add_argument(
"--graceful",
"-g",
help="Fail gracefully and continue if a processing error occurs",
action='store_true'
)
# the default on macOS is spawn, but this fails with:
# ForkingPickler(file, protocol).dump(obj)
# TypeError: cannot pickle 'weakref' object
# Work-round: allow override of start method
parser.add_argument(
"--start_method",
help="Override start method (e.g. fork on macos)",
type=str
)
parser.add_argument(
"--old_url",
help="Provide input database URL",
type=str
)
parser.add_argument(
"--old_name",
help="Provide input database name",
type=str
)
parser.add_argument(
"--new_url",
help="Provide output database URL",
type=str
)
parser.add_argument(
"--new_prefix",
help="Provide output database prefix",
type=str
)
args = parser.parse_args()
if args.start_method:
multiprocessing.set_start_method(args.start_method)
asyncio.run(main(args))