| #!/usr/bin/env python3.4 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| #the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import sys |
| import random, time |
| import hashlib |
| import os |
| from threading import Thread, Lock |
| import mailbox |
| import email.errors, email.utils, email.header |
| from urllib.request import urlopen |
| import re |
| import chardet |
| import datetime |
| import configparser |
| import argparse |
| from os import listdir |
| from os.path import isfile, join, isdir |
| import glob |
| import codecs |
| import multiprocessing |
| import tempfile |
| import gzip |
| |
| try: |
| from elasticsearch import Elasticsearch, helpers |
| from formatflowed import convertToWrapped |
| except: |
| print("Sorry, you need to install the elasticsearch and formatflowed modules from pip first.") |
| sys.exit(-1) |
| |
| y = 0 |
| baddies = 0 |
| block = Lock() |
| lists = [] |
| start = time.time() |
| quickmode = False |
| lid_override = None |
| private = False |
| appender = "apache.org" |
| |
| |
| source = "./" |
| list_override = None |
| project = "" |
| recursive = False |
| filebased = False |
| fileToLID = {} |
| interactive = False |
| extension = "*.mbox" |
| attachments = False |
| piperWeirdness = False |
| parseHTML = False |
| iBody = None |
| resendTo = None |
| |
| # Fetch config |
| config = configparser.RawConfigParser() |
| config.read('ponymail.cfg') |
| auth = None |
| if config.has_option('elasticsearch', 'user'): |
| auth = (config.get('elasticsearch','user'), config.get('elasticsearch','password')) |
| |
| |
| |
| ssl = False |
| dbname = config.get("elasticsearch", "dbname") |
| if config.has_option("elasticsearch", "ssl") and config.get("elasticsearch", "ssl").lower() == 'true': |
| ssl = True |
| |
| cropout = None |
| if config.has_option("debug", "cropout") and config.get("debug", "cropout") != "": |
| cropout = config.get("debug", "cropout") |
| |
| uri = "" |
| if config.has_option("elasticsearch", "uri") and config.get("elasticsearch", "uri") != "": |
| uri = config.get("elasticsearch", "uri") |
| es = Elasticsearch([ |
| { |
| 'host': config.get("elasticsearch", "hostname"), |
| 'port': int(config.get("elasticsearch", "port")), |
| 'use_ssl': ssl, |
| 'url_prefix': uri, |
| 'http_auth': auth |
| }], |
| max_retries=5, |
| retry_on_timeout=True |
| ) |
| |
| rootURL = "" |
| |
| def parse_attachment(part): |
| cd = part.get("Content-Disposition", None) |
| if cd: |
| dispositions = cd.strip().split(";") |
| if dispositions[0].lower() == "attachment": |
| try: |
| fd = part.get_payload(decode=True) |
| if fd: |
| attachment = {} |
| attachment['content_type'] = part.get_content_type() |
| attachment['size'] = len(fd) |
| attachment['filename'] = None |
| h = hashlib.sha256(fd).hexdigest() |
| b64 = codecs.encode(fd, "base64").decode('ascii') |
| attachment['hash'] = h |
| for param in dispositions[1:]: |
| key,val = param.split("=", 1) |
| if key.lower().strip() == "filename": |
| val = val.strip(' "') |
| print("Found attachment: %s" % val) |
| attachment['filename'] = val |
| if attachment['filename']: |
| return attachment, b64 # Return meta data and contents separately |
| except: |
| pass |
| return None, None |
| |
| def msgfiles(msg): |
| attachments = [] |
| contents = {} |
| if msg.is_multipart(): |
| for part in msg.walk(): |
| part_meta, part_file = parse_attachment(part) |
| if part_meta: |
| attachments.append(part_meta) |
| contents[part_meta['hash']] = part_file |
| return attachments, contents |
| |
| def pm_charsets(msg): |
| charsets = set({}) |
| for c in msg.get_charsets(): |
| if c is not None: |
| charsets.update([c]) |
| return charsets |
| |
| def msgbody(msg): |
| global parseHTML, iBody |
| firstHTML = None |
| body = None |
| if msg.is_multipart(): |
| for part in msg.walk(): |
| try: |
| if part.is_multipart(): |
| for subpart in part.walk(): |
| if subpart.get_content_type() == 'text/plain' and not body: |
| body = subpart.get_payload(decode=True) |
| elif subpart.get_content_type() == 'text/html' and parseHTML and not firstHTML: |
| firstHTML = subpart.get_payload(decode=True) |
| |
| elif part.get_content_type() == 'text/plain' and not body: |
| body = part.get_payload(decode=True) |
| elif part.get_content_type() == 'text/html' and parseHTML and not firstHTML: |
| firstHTML = part.get_payload(decode=True) |
| except Exception as err: |
| print("Body parser error: %s" % err) |
| elif msg.get_content_type() == 'text/plain': |
| body = msg.get_payload(decode=True) |
| elif msg.get_content_type() == 'text/html' and parseHTML and not firstHTML: |
| firstHTML = msg.get_payload(decode=True) |
| |
| # this requires a GPL lib, user will have to install it themselves |
| if firstHTML and (not body or len(body) <= 1 or (iBody and str(body).find(str(iBody)) != -1)): |
| print("No body found as text/plain, converting HTML to text") |
| body = html2text.html2text(firstHTML.decode("utf-8", errors='replace') if type(firstHTML) is bytes else firstHTML) |
| |
| for charset in pm_charsets(msg): |
| try: |
| body = body.decode(charset) if type(body) is bytes else body |
| except: |
| body = body.decode('utf-8', errors='replace') if type(body) is bytes else body |
| |
| return body |
| |
| |
| def msgfactory(fp): |
| try: |
| return email.message_from_file(fp) |
| except Exception as err: |
| # Don't return None since that will |
| # stop the mailbox iterator |
| print("hmm: %s" % err) |
| return None |
| |
| |
| class BulkThread(Thread): |
| def assign(self, json, xes, dtype = 'mbox'): |
| self.json = json |
| self.xes = xes |
| self.dtype = dtype |
| |
| def insert(self): |
| global config |
| sys.stderr.flush() |
| iname = config.get("elasticsearch", "dbname") |
| if not self.xes.indices.exists(iname): |
| self.xes.indices.create(index = iname) |
| |
| js_arr = [] |
| i = 0 |
| for entry in self.json: |
| js = entry |
| js['@version'] = 1 |
| #js['@import_timestamp'] = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime()) |
| js_arr.append({ |
| '_op_type': 'index', |
| '_index': iname, |
| '_type': self.dtype, |
| '_id': js['mid'], |
| 'doc': js, |
| '_source': js |
| }) |
| try: |
| helpers.bulk(self.xes, js_arr) |
| except Exception as err: |
| print("Warning: Could not bulk insert: %s" % err) |
| #print("Inserted %u entries" % len(js_arr)) |
| |
| |
| class SlurpThread(Thread): |
| |
| def run(self): |
| global block, y, es, lists, baddies, config, resendTo |
| ja = [] |
| jas = [] |
| print("Thread started") |
| mla = None |
| ml = "" |
| mboxfile = "" |
| filename = "" |
| xlist_override = None |
| |
| while len(lists) > 0: |
| print("%u elements left to slurp" % len(lists)) |
| block.acquire() |
| try: |
| mla = lists.pop(0) |
| except Exception as err: |
| print("Could not pop list: %s" % err) |
| block.release() |
| return |
| if not mla: |
| print("Nothing more to do here") |
| block.release() |
| return |
| block.release() |
| y += 1 |
| EY = 1980 |
| EM = 1 |
| stime = time.time() |
| if filebased: |
| |
| tmpname = mla[0] |
| filename = mla[0] |
| xlist_override = mla[1] |
| print("Slurping %s" % filename) |
| else: |
| ml = mla[0] |
| mboxfile = mla[1] |
| xlist_override = list_override |
| print("Slurping %s/%s" % (ml, mboxfile)) |
| m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile) |
| EY = 1997 |
| EM = 1 |
| if m: |
| EY = int(m.group(1)) |
| EM = int(m.group(2)) |
| ctx = urlopen("%s%s/%s" % (source, ml, mboxfile )) |
| inp = ctx.read().decode(ctx.headers.get_content_charset() or 'utf-8', errors='ignore') |
| |
| tmpname = hashlib.sha224(("%f-%f-%s-%s.mbox" % (random.random(), time.time(), ml, mboxfile)).encode('utf-8') ).hexdigest() |
| with open(tmpname, "w") as f: |
| f.write(inp) |
| f.close() |
| |
| count = 0 |
| LEY = EY |
| for message in mailbox.mbox(tmpname): |
| if resendTo: |
| print("Delivering message %s via MTA" % message['message-id'] if 'message-id' in message else '??') |
| s = SMTP('localhost') |
| try: |
| if list_override: |
| message.replace_header('List-ID', list_override) |
| message.replace_header('To', resendTo) |
| except: |
| if list_override: |
| message['List-ID'] = list_override |
| message['cc'] = None |
| s.send_message(message, from_addr=None, to_addrs=(resendTo)) |
| continue |
| if (time.time() - stime > 120): |
| print("Whoa, this is taking way too long, ignoring %s for now" % tmpname) |
| break |
| if 'subject' in message: |
| subject = message['subject'] # Could possibly be None. |
| mid = message['message-id'] |
| |
| lid = message['list-id'] |
| if not lid or lid == "": # Guess list name in absence |
| lid = '.'.join(reversed(ml.split("-"))) + "." + appender |
| |
| # Compact LID to <foo@domain>, discard rest |
| m = re.search(r"(<.+>)", lid) |
| if m: |
| lid = m.group(1) |
| if xlist_override and len(xlist_override) > 3: |
| lid = xlist_override |
| lid = lid.replace("@",".") # we want foo.bar.org, not foo@bar.org |
| lid = "<%s>" % lid.strip("<>") # We need <> around it! |
| if cropout: |
| lid = lid.replace(cropout, "") |
| date = message['date'] |
| fro = message['from'] |
| to = message['to'] |
| body = msgbody(message) |
| try: |
| if 'content-type' in message and message['content-type'].find("flowed") != -1: |
| body = convertToWrapped(body, character_set="utf-8") |
| if isinstance(body, str): |
| body = body.encode('utf-8') |
| except Exception as err: |
| try: |
| body = body.decode(chardet.detect(body)['encoding']) |
| except Exception as err: |
| try: |
| body = body.decode('latin-1') |
| except: |
| try: |
| if isinstance(body, str): |
| body = body.encode('utf-8') |
| except: |
| body = None |
| |
| okay = True |
| dheader = {} |
| for key in ['to','from','subject','message-id']: |
| try: |
| hval = "" |
| if message.get(key): |
| for t in email.header.decode_header(message[key]): |
| if t[1] == None or t[1].find("8bit") != -1: |
| hval += t[0].decode('utf-8', errors='replace') if type(t[0]) is bytes else t[0] |
| else: |
| hval += t[0].decode(t[1],errors='ignore') |
| dheader[key] = hval |
| else: |
| dheader[key] = "(Unknown)" |
| except Exception as err: |
| print("Could not decode headers, ignoring..: %s" % err) |
| okay = False |
| mdt = "" |
| if not 'date' in message and 'received' in message: |
| m = re.search(r"(\d+ \S+ \d{4} \d\d:\d\d:\d\d ([-+]\d{4})?)", message['received']) |
| if m: |
| mdt = m.group(1) |
| else: |
| mdt = message['date'] |
| mdate = None |
| try: |
| mdate = email.utils.parsedate_tz(mdt) |
| except: |
| pass |
| if not mdate or mdate[0] < (LEY-1): |
| print("Date is wrong or missing here, setting to %s" % ( LEY)) |
| mdate = datetime.datetime(LEY, EM, 1).timetuple() |
| else: |
| LEY = mdate[0] # Gather evidence 'n'stuff! |
| mdatestring = "" |
| try: |
| mdatestring = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(email.utils.mktime_tz(mdate))) |
| except: |
| okay = False |
| if body and okay and mdate and {'from','subject'} <= set(dheader): |
| # Pipermail transforms from: to something weird - reset that! |
| if piperWeirdness: |
| m = re.match(r"(.+) at ([^(]+) \((.+)\)$", dheader['from']) |
| # Try just 'foo at bar.tld' if 'foo at bar.tld (foo bar)' isn't working |
| if not m: |
| m = re.match(r"(.+) at ([^(]+)$", dheader['from']) |
| if m: |
| dheader['from'] = "%s <%s@%s>" % (m.group(3), m.group(1), m.group(2)) |
| |
| attachments, contents = msgfiles(message) |
| if mid == None or not mid: |
| try: |
| mid = hashlib.sha256(body if type(body) is bytes else body.encode('ascii', errors='ignore')).hexdigest() + "@" + lid + "@" + appender |
| except: |
| if filebased: |
| mid = hashlib.sha256("%f-%f-%s" % (random.random(), time.time(), filename) ).hexdigest()+ "@" + appender |
| else: |
| mid = hashlib.sha256("%f-%f-%s-%s" % (random.random(), time.time(), ml, mboxfile) ).hexdigest()+ "@" + appender |
| print("No MID found, setting to %s" % mid) |
| mid2 = "%s@%s@%s" % (hashlib.sha224(body if type(body) is bytes else body.encode('ascii', errors='ignore')).hexdigest(), email.utils.mktime_tz(mdate), lid) |
| count += 1 |
| mr = "" |
| if 'references' in message: |
| mr = message['references'] |
| irt = "" |
| if 'in-reply-to' in message: |
| try: |
| irt = "".join(message['in-reply-to']) |
| except: |
| irt = message.get('in-reply-to').__str__() |
| |
| json = { |
| 'from_raw': dheader['from'], |
| 'from': dheader['from'], |
| 'to': dheader['to'], |
| 'subject': dheader['subject'], |
| 'cc': message.get('cc'), |
| 'message-id': mid, |
| 'mid': mid2, |
| 'epoch': email.utils.mktime_tz(mdate), |
| 'list': lid, |
| 'list_raw': lid, |
| 'date': mdatestring, |
| 'private': private, |
| 'references': mr, |
| 'in-reply-to': irt, |
| 'body': body.decode('utf-8', errors='replace') if type(body) is bytes else body, |
| 'attachments': attachments |
| } |
| json_source = { |
| 'mid': mid2, |
| 'message-id': mid, |
| 'source': message.as_bytes().decode('utf-8', errors='replace') |
| } |
| ja.append(json) |
| jas.append(json_source) |
| if contents: |
| iname = config.get("elasticsearch", "dbname") |
| if not args.dry: |
| for key in contents: |
| es.index( |
| index=iname, |
| doc_type="attachment", |
| id=key, |
| body = { |
| 'source': contents[key] |
| } |
| ) |
| if len(ja) >= 40: |
| if not args.dry: |
| bulk = BulkThread() |
| bulk.assign(ja, es, 'mbox') |
| bulk.insert() |
| ja = [] |
| |
| if not args.dry: |
| bulks = BulkThread() |
| bulks.assign(jas, es, 'mbox_source') |
| bulks.insert() |
| jas = [] |
| else: |
| baddies += 1 |
| if filebased: |
| print("Parsed %u records from %s" % (count, filename)) |
| else: |
| print("Parsed %s/%s: %u records from %s" % (ml, mboxfile, count, tmpname)) |
| os.unlink(tmpname) |
| |
| y += count |
| if not args.dry: |
| bulk = BulkThread() |
| bulk.assign(ja, es) |
| bulk.insert() |
| ja = [] |
| |
| if not args.dry: |
| bulks = BulkThread() |
| bulks.assign(jas, es, 'mbox_source') |
| bulks.insert() |
| jas = [] |
| print("Done, %u elements left to slurp" % len(lists)) |
| |
| tlpname = "foo" |
| |
| parser = argparse.ArgumentParser(description='Command line options.') |
| parser.add_argument('--source', dest='source', type=str, nargs=1, |
| help='Source to scan (either http(s):// or file path)') |
| parser.add_argument('--recursive', dest='recursive', action='store_true', |
| help='Do a recursive scan (sub dirs etc)') |
| parser.add_argument('--interactive', dest='interactive', action='store_true', |
| help='Ask for help when possible') |
| parser.add_argument('--quick', dest='quick', action='store_true', |
| help='Only grab the first file you can find') |
| parser.add_argument('--mod-mbox', dest='modmbox', action='store_true', |
| help='This is mod_mbox, derive list-id and files from it') |
| parser.add_argument('--pipermail', dest='pipermail', action='store_true', |
| help='This is pipermail, derive files from it (list ID has to be set!)') |
| parser.add_argument('--lid', dest='listid', type=str, nargs=1, |
| help='Optional List-ID to override source with.') |
| parser.add_argument('--project', dest='project', type=str, nargs=1, |
| help='Optional project to look for ($project-* will be imported as well)') |
| parser.add_argument('--ext', dest='ext', type=str, nargs=1, |
| help='Optional file extension (or call it with no args to not care)') |
| parser.add_argument('--domain', dest='domain', type=str, nargs=1, |
| help='Optional domain extension for MIDs and List ID reconstruction)') |
| parser.add_argument('--private', dest='private', action='store_true', |
| help='This is a privately archived list. Filter through auth proxy.') |
| parser.add_argument('--attachments', dest='attachments', action='store_true', |
| help='Also iport attached files in emails') |
| parser.add_argument('--dry', dest='dry', action='store_true', |
| help='Do not save emails to elasticsearch, only test importing') |
| parser.add_argument('--html2text', dest='html2text', action='store_true', |
| help='If no text/plain is found, try to parse HTML using html2text') |
| parser.add_argument('--ignorebody', dest='ibody', type=str, nargs=1, |
| help='Optional email bodies to treat as empty (in conjunction with --html2text)') |
| parser.add_argument('--resend', dest='resend', type=str, nargs=1, |
| help='DANGER ZONE: Resend every read email to this recipient as a new email') |
| |
| args = parser.parse_args() |
| |
| if len(sys.argv) <= 2: |
| parser.print_help() |
| sys.exit(-1) |
| |
| |
| |
| if args.source: |
| source = args.source[0] |
| if args.listid: |
| list_override = args.listid[0] |
| if args.project: |
| project = args.project[0] |
| if args.domain: |
| appender = args.domain[0] |
| if args.recursive: |
| recursive = args.recursive |
| if args.interactive: |
| interactive = args.interactive |
| if args.quick: |
| quickmode = args.quick |
| if args.private: |
| private = args.private |
| if args.attachments: |
| attachments = args.attachments |
| if args.ext: |
| extension = args.ext[0] |
| if args.html2text: |
| import html2text |
| parseHTML = True |
| if args.ibody: |
| iBody = args.ibody[0] |
| if args.resend: |
| resendTo = args.resend[0] |
| from smtplib import SMTP |
| |
| baddies = 0 |
| |
| |
| def globDir(d): |
| dirs = [ f for f in listdir(d) if isdir(join(d,f)) ] |
| mboxes = [ f for f in glob.glob(join(d,"*" + extension)) if isfile(f) ] |
| lo = list_override |
| if not d in fileToLID and len(mboxes) > 0 and interactive: |
| print("Would you like to set a list-ID override for %s?:" % d) |
| lo = sys.stdin.readline() |
| if lo and len(lo) > 3: |
| fileToLID[d] = "<" + lo.strip("\r\n<>") + ">" |
| print("Righto, setting it to %s." % fileToLID[d]) |
| else: |
| print("alright, I'll try to figure it out myself!") |
| for fi in sorted(mboxes): |
| lists.append([fi, fileToLID.get(d) if fileToLID.get(d) else list_override]) |
| |
| for nd in sorted(dirs): |
| globDir(join(d,nd)) |
| |
| |
| # File based import?? |
| if source[0] == "/" or source[0] == ".": |
| print("Doing file based import") |
| filebased = True |
| globDir(source) |
| |
| |
| # HTTP(S) based import? |
| elif source[0] == "h": |
| data = urlopen(source).read().decode('utf-8') |
| print("Fetched %u bytes of main data, parsing month lists" % len(data)) |
| |
| ns = r"<a href='(%s[-a-z0-9]+)/'" % project |
| if project.find("-") != -1: |
| ns = r"<a href='(%s)/'" % project |
| |
| if args.modmbox: |
| for mlist in re.finditer(ns, data): |
| ml = mlist.group(1) |
| mldata = urlopen("%s%s/" % (source, ml)).read().decode('utf-8') |
| present = re.search(r"<th colspan=\"3\">Year 20[\d]{2}</th>", mldata) # Check that year 2014-2017 exists, otherwise why keep it? |
| if present: |
| qn = 0 |
| for mbox in re.finditer(r"(\d+\.mbox)/thread", mldata): |
| qn += 1 |
| mboxfile = mbox.group(1) |
| lists.append([ml, mboxfile]) |
| print("Adding %s/%s to slurp list" % (ml, mboxfile)) |
| if quickmode and qn >= 2: |
| break |
| |
| if args.pipermail: |
| filebased = True |
| piperWeirdness = True |
| if not list_override: |
| print("You need to specify a list ID with --lid when importing from Pipermail!") |
| sys.exit(-1) |
| ns = r"href=\"(\d+-[a-zA-Z]+\.txt(\.gz)?)\"" |
| qn = 0 |
| for mlist in re.finditer(ns, data): |
| ml = mlist.group(1) |
| mldata = urlopen("%s%s" % (source, ml)).read() |
| tmpfile = tempfile.NamedTemporaryFile(mode='w+b', buffering=1, delete=False) |
| try: |
| if ml.find(".gz") != -1: |
| mldata = gzip.decompress(mldata) |
| except Exception as err: |
| print("This wasn't a gzip file: %s" % err ) |
| print(len(mldata)) |
| tmpfile.write(mldata) |
| tmpfile.flush() |
| tmpfile.close() |
| lists.append([tmpfile.name, list_override]) |
| print("Adding %s/%s to slurp list as %s" % (source, ml, tmpfile.name)) |
| qn += 1 |
| if quickmode and qn >= 2: |
| break |
| |
| threads = [] |
| print("Starting up to %u threads to fetch the %u %s lists" % (multiprocessing.cpu_count(), len(lists), project)) |
| for i in range(1,multiprocessing.cpu_count()+1): |
| t = SlurpThread() |
| threads.append(t) |
| t.start() |
| print("Started no. %u" % i) |
| |
| for t in threads: |
| t.join() |
| |
| print("All done! %u records inserted/updated after %u seconds. %u records were bad and ignored" % (y, int(time.time() - start), baddies)) |