| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import asyncio |
| |
| from elasticsearch import AsyncElasticsearch, Elasticsearch, helpers |
| from elasticsearch.helpers import async_scan |
| |
| if not __package__: |
| from plugins import generators |
| else: |
| from .plugins import generators |
| |
| import argparse |
| import base64 |
| import hashlib |
| import multiprocessing |
| import os |
| import time |
| |
| # Increment this number whenever breaking changes happen in the migration workflow: |
| MIGRATION_MAGIC_NUMBER = "2" |
| |
| # Max number of parallel conversions to perform before pushing. 75-ish percent of max cores. |
| cores = len(os.sched_getaffinity(0)) |
| MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1) |
| |
| |
| class MultiDocProcessor: |
| """MultiProcess document processor""" |
| |
| def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8): |
| self.processes = [] |
| self.queues = [] |
| self.target = target |
| self.manager = multiprocessing.Manager() |
| self.lock = self.manager.Lock() |
| self.processed = self.manager.Value("i", 0) |
| self.processed_last_count = 0 |
| self.start_time = time.time() |
| self.queue_pointer = 0 |
| self.num_processes = num_processes |
| for i in range(0, num_processes): |
| q = multiprocessing.Queue() |
| p = multiprocessing.Process( |
| target=self.start, |
| args=( |
| q, |
| old_es_url, |
| new_es_url, |
| ), |
| ) |
| self.queues.append(q) |
| self.processes.append(p) |
| p.start() |
| |
| def feed(self, *params): |
| """Feed arguments to the next available processor""" |
| self.queues[self.queue_pointer].put(params) |
| self.queue_pointer += 1 |
| self.queue_pointer %= self.num_processes |
| |
| def sighup(self): |
| for queue in self.queues: |
| queue.put("SIGHUP") |
| |
| def stop(self): |
| for queue in self.queues: |
| queue.put("DONE") |
| for proc in self.processes: |
| proc.join() |
| |
| def status(self, total): |
| processed = self.processed.value |
| if processed - self.processed_last_count >= 1000: |
| self.processed_last_count = processed |
| now = time.time() |
| time_spent = now - self.start_time |
| docs_per_second = (processed / time_spent) or 1 |
| time_left = (total - processed) / docs_per_second |
| |
| # stringify time left |
| time_left_str = "%u seconds" % time_left |
| if time_left > 60: |
| time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60) |
| if time_left > 3600: |
| time_left_str = "%u hour(s), %u minute(s), %u second(s)" % ( |
| int(time_left / 3600), |
| int(time_left % 3600 / 60), |
| time_left % 60, |
| ) |
| |
| print( |
| "Processed %u documents, %u remain. ETA: %s (at %u documents per second)" |
| % (processed, (total - processed), time_left_str, docs_per_second) |
| ) |
| |
| def start(self, queue, old_es_url, new_es_url): |
| old_es = Elasticsearch([old_es_url]) |
| new_es = Elasticsearch([new_es_url]) |
| bulk_array = [] |
| while True: |
| params = queue.get() |
| if params == "SIGHUP": # Push stragglers |
| if bulk_array: |
| bulk_push(bulk_array, new_es) |
| bulk_array[:] = [] |
| continue |
| elif params == "DONE": # Close up shop completely |
| if bulk_array: |
| bulk_push(bulk_array, new_es) |
| old_es.close() |
| new_es.close() |
| return |
| else: |
| as_list = list(params) |
| as_list.insert(0, old_es) |
| ret_val = self.target(*as_list) |
| if ret_val: |
| bulk_array.extend(ret_val) |
| with self.lock: |
| self.processed.value += 1 |
| if len(bulk_array) >= 200: |
| bulk_push(bulk_array, new_es) |
| bulk_array[:] = [] |
| |
| |
| def bulk_push(json, es): |
| """Pushes a bunch of objects to ES in a bulk operation""" |
| js_arr = [] |
| for entry in json: |
| bulk_op = { |
| "_op_type": "index", |
| "_index": entry["index"], |
| "_id": entry["id"], |
| "_source": entry["body"], |
| } |
| js_arr.append(bulk_op) |
| helpers.bulk(es, js_arr) |
| |
| |
| def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim): |
| now = time.time() |
| list_id = doc["_source"]["list_raw"].strip("<>") |
| try: |
| source = old_es.get(index=old_dbname, doc_type="mbox_source", id=doc["_id"]) |
| # If we hit a 404 on a source, we have to fake an empty document, as we don't know the source. |
| except: |
| print("Source for %s was not found, faking it..." % doc["_id"]) |
| source = {"_source": {"source": ""}} |
| source_text: str = source["_source"]["source"] |
| if ":" not in source_text: # Base64 |
| source_text = base64.b64decode(source_text) |
| else: # bytify |
| source_text = source_text.encode("utf-8", "ignore") |
| if do_dkim: |
| dkim_id = generators.dkimid(None, None, list_id, None, source_text) |
| old_id = doc["_id"] |
| doc["_source"]["mid"] = dkim_id |
| doc["_source"]["permalinks"] = [dkim_id, old_id] |
| else: |
| doc["_source"]["permalinks"] = [doc["_id"]] |
| |
| source["_source"]["permalinks"] = doc["_source"]["permalinks"] |
| doc["_source"]["dbid"] = hashlib.sha3_256(source_text).hexdigest() |
| |
| # Append migration details to notes field in doc |
| notes = doc["_source"].get("_notes", []) |
| # We want a list, not a single string |
| if isinstance(notes, str): |
| notes = list(notes) |
| notes.append( |
| "MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, " |
| "using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER) |
| ) |
| # If we re-indexed the document, make a note of that as well. |
| if do_dkim: |
| notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, " "from %s to %s" % (now, dkim_id, old_id)) |
| doc["_source"]["_notes"] = notes |
| |
| # Copy to new DB |
| return ( |
| {"index": dbname_mbox, "id": doc["_id"], "body": doc["_source"]}, |
| {"index": dbname_source, "id": doc["_source"]["dbid"], "body": source["_source"]}, |
| ) |
| |
| |
| def process_attachment(old_es, doc, dbname_attachment): |
| return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},) |
| |
| |
| async def main(no_jobs): |
| print("Welcome to the Apache Pony Mail -> Foal migrator.") |
| print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.") |
| print("We will be utilizing %u cores for this operation." % no_jobs) |
| print("------------------------------------") |
| old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/" |
| new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/" |
| if old_es_url == new_es_url: |
| print("Old and new DB should not be the same, assuming error in input and exiting!") |
| return |
| ols_es_async = AsyncElasticsearch([old_es_url]) |
| |
| old_dbname = input("What is the database name for the old Pony Mail emails? [ponymail]: ") or "ponymail" |
| new_dbprefix = input("What is the database prefix for the new Pony Mail emails? [ponymail]: ") or "ponymail" |
| |
| do_dkim = True |
| dkim_txt = ( |
| input( |
| "Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks " |
| "(y/n) [y]: " |
| ) |
| or "y" |
| ) |
| if dkim_txt.lower() == "n": |
| do_dkim = False |
| |
| # Define index names for new ES |
| dbname_mbox = new_dbprefix + "-mbox" |
| dbname_source = new_dbprefix + "-source" |
| dbname_attachment = new_dbprefix + "-attachment" |
| |
| # Let's get started..! |
| start_time = time.time() |
| count = await ols_es_async.count(index=old_dbname, doc_type="mbox") |
| no_emails = count["count"] |
| |
| print("------------------------------------") |
| print("Starting migration of %u emails, this may take quite a while..." % no_emails) |
| |
| processes = MultiDocProcessor(old_es_url, new_es_url, process_document, no_jobs) |
| |
| processed_last_count = 0 |
| current_executor = 0 |
| docs_read = 0 |
| async for doc in async_scan( |
| client=ols_es_async, |
| query={"query": {"match_all": {}}}, |
| doc_type="mbox", |
| index=old_dbname, |
| ): |
| docs_read += 1 |
| processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim) |
| # Don't speed too far ahead of processing... |
| processed = processes.processed.value |
| while docs_read - processed > 100 * no_jobs: |
| await asyncio.sleep(0.01) |
| processed = processes.processed.value + 0 |
| |
| processes.status(no_emails) |
| |
| # There may be some docs left over to push |
| processes.sighup() |
| while processed < no_emails: # Wait for all documents to have been processed. |
| await asyncio.sleep(1) |
| print(f"Waiting for bulk push to complete ({processed} out of {no_emails} done...)") |
| processed = processes.processed.value |
| |
| processes.stop() |
| |
| # Process attachments |
| start_time = time.time() |
| processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs) |
| docs_read = 0 |
| count = await ols_es_async.count(index=old_dbname, doc_type="attachment") |
| no_att = count["count"] |
| print("Transferring %u attachments..." % no_att) |
| async for doc in async_scan( |
| client=ols_es_async, |
| query={"query": {"match_all": {}}}, |
| doc_type="attachment", |
| index=old_dbname, |
| ): |
| processes.feed(doc, dbname_attachment) |
| docs_read += 1 |
| |
| # Don't speed ahead |
| processed = processes.processed.value + 0 |
| while docs_read - processed > 10 * no_jobs: |
| await asyncio.sleep(0.01) |
| processed = processes.processed.value + 0 |
| |
| processes.status(no_att) |
| |
| # There may be some docs left over to push |
| processes.sighup() |
| while processed < no_att: # Wait for all attachments to have been processed. |
| await asyncio.sleep(1) |
| print(f"Waiting for bulk push to complete ({processed} out of {no_att} done...)") |
| processed = processes.processed.value |
| |
| processes.stop() |
| await ols_es_async.close() |
| print("All done, enjoy!") |
| |
| |
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--jobs", |
| "-j", |
| help="Number of concurrent processing jobs to run. Default is %u." % MAX_PARALLEL_OPS, |
| type=int, |
| default=MAX_PARALLEL_OPS, |
| ) |
| args = parser.parse_args() |
| loop = asyncio.get_event_loop() |
| loop.run_until_complete(main(args.jobs)) |