| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| #the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import time |
| import datetime |
| import re |
| import json |
| import hashlib |
| import plugins.utils.jsonapi |
| import threading |
| import requests.exceptions |
| import os |
| |
| """ |
| This is the Kibble Discourse scanner plugin. |
| """ |
| |
| title = "Scanner for Discourse Forums" |
| version = "0.1.0" |
| |
| def accepts(source): |
| """ Determines whether we want to handle this source """ |
| if source['type'] == 'discourse': |
| return True |
| return False |
| |
| |
| def scanJob(KibbleBit, source, cat, creds): |
| """ Scans a single discourse category for activity """ |
| NOW = int(datetime.datetime.utcnow().timestamp()) |
| |
| # Get $discourseURL/c/$catID |
| |
| catURL = os.path.join(source['sourceURL'], "c/%s" % cat['id']) |
| KibbleBit.pprint("Scanning Discourse category '%s' at %s" % (cat['slug'], catURL)) |
| |
| page = 1 |
| allUsers = {} |
| |
| # For each paginated result (up to page 100), check for changes |
| while page < 100: |
| pcatURL = "%s?page=%u" % (catURL, page) |
| catjson = plugins.utils.jsonapi.get(pcatURL, auth = creds) |
| page += 1 |
| |
| |
| if catjson: |
| |
| # If we hit an empty list (no more topics), just break the loop. |
| if not catjson['topic_list']['topics']: |
| break |
| |
| # First (if we have data), we should store the known users |
| # Since discourse hides the email (obviously!), we'll have to |
| # fake one to generate an account. |
| fakeDomain = "foo.discourse" |
| m = re.match(r"https?://([-a-zA-Z0-9.]+)", source['sourceURL']) |
| if m: |
| fakeDomain = m.group(1) |
| for user in catjson['users']: |
| # Fake email address, compute deterministic ID |
| email = "%s@%s" % (user['username'], fakeDomain) |
| dhash = hashlib.sha224( ("%s-%s-%s" % (source['organisation'], source['sourceURL'], email) ).encode('ascii', errors='replace')).hexdigest() |
| |
| # Construct a very sparse user document |
| userDoc = { |
| 'id': dhash, |
| 'organisation': source['organisation'], |
| 'name': user['username'], |
| 'email': email, |
| } |
| |
| # Store user-ID-to-username mapping for later |
| allUsers[user['id']] = userDoc |
| |
| # Store it (or, queue storage) |
| KibbleBit.append('person', userDoc) |
| |
| # Now, for each topic, we'll store a topic document |
| for topic in catjson['topic_list']['topics']: |
| |
| # Calculate topic ID |
| dhash = hashlib.sha224( ("%s-%s-topic-%s" % (source['organisation'], source['sourceURL'], topic['id']) ).encode('ascii', errors='replace')).hexdigest() |
| |
| # Figure out when topic was created and updated |
| CreatedDate = datetime.datetime.strptime(topic['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() |
| if topic.get('last_posted_at'): |
| UpdatedDate = datetime.datetime.strptime(topic['last_posted_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() |
| else: |
| UpdatedDate = 0 |
| |
| # Determine whether we should scan this topic or continue to the next one. |
| # We'll do this by seeing if the topic already exists and has no changes or not. |
| if KibbleBit.exists('forum_topic', dhash): |
| fdoc = KibbleBit.get('forum_topic', dhash) |
| # If update in the old doc was >= current update timestamp, skip the topic |
| if fdoc['updated'] >= UpdatedDate: |
| continue |
| |
| |
| # Assuming we need to scan this, start by making the base topic document |
| topicdoc = { |
| 'id': dhash, |
| 'sourceID': source['sourceID'], |
| 'organisation': source['organisation'], |
| |
| 'category': cat['slug'], |
| 'title': topic['title'], |
| 'creator': allUsers[topic['posters'][0]['user_id']]['id'], |
| 'creatorName': allUsers[topic['posters'][0]['user_id']]['name'], |
| 'created': CreatedDate, |
| 'updated': UpdatedDate, |
| 'solved': False, # Discourse doesn't have this notion, but other forums might. |
| 'posts': topic['posts_count'], |
| 'views': topic['views'], |
| 'url': source['sourceURL'] + "/t/%s/%s" % (topic['slug'], topic['id']) |
| } |
| |
| KibbleBit.append('forum_topic', topicdoc) |
| KibbleBit.pprint("%s is new or changed, scanning" % topicdoc['url']) |
| |
| # Now grab all the individual replies/posts |
| # Remember to not have it count as a visit! |
| pURL = "%s?track_visit=false&forceLoad=true" % topicdoc['url'] |
| pjson = plugins.utils.jsonapi.get(pURL, auth = creds) |
| |
| posts = pjson['post_stream']['posts'] |
| |
| # For each post/reply, construct a forum_entry document |
| KibbleBit.pprint("%s has %u posts" % (pURL, len(posts))) |
| for post in posts: |
| phash = hashlib.sha224( ("%s-%s-post-%s" % (source['organisation'], source['sourceURL'], post['id']) ).encode('ascii', errors='replace')).hexdigest() |
| |
| # Find the hash of the person who posted it |
| # We may know them, or we may have to store them |
| if post['user_id'] in allUsers: |
| uhash = allUsers[post['user_id']]['id'] |
| else: |
| # Same as before, fake email, store... |
| email = "%s@%s" % (post['username'], fakeDomain) |
| uhash = hashlib.sha224( ("%s-%s-%s" % (source['organisation'], source['sourceURL'], email) ).encode('ascii', errors='replace')).hexdigest() |
| |
| # Construct a very sparse user document |
| userDoc = { |
| 'id': uhash, |
| 'organisation': source['organisation'], |
| 'name': post['username'], |
| 'email': email, |
| } |
| |
| # Store user-ID-to-username mapping for later |
| allUsers[user['id']] = userDoc |
| |
| # Store it (or, queue storage) |
| KibbleBit.append('person', userDoc) |
| |
| # Get post date |
| CreatedDate = datetime.datetime.strptime(post['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() |
| |
| # Store the post/reply document |
| pdoc = { |
| 'id': phash, |
| 'sourceID': source['sourceID'], |
| 'organisation': source['organisation'], |
| |
| 'created': CreatedDate, |
| 'topic': dhash, |
| 'post_id': post['id'], |
| 'text': post['cooked'], |
| 'url': topicdoc['url'] |
| } |
| KibbleBit.append('forum_post', pdoc) |
| |
| return True |
| |
| # Boo, it failed! |
| KibbleBit.pprint("Fetching job data failed!") |
| return False |
| |
| |
| class discourseThread(threading.Thread): |
| """ Generic thread class for scheduling multiple scans at once """ |
| def __init__(self, block, KibbleBit, source, creds, jobs): |
| super(discourseThread, self).__init__() |
| self.block = block |
| self.KibbleBit = KibbleBit |
| self.creds = creds |
| self.source = source |
| self.jobs = jobs |
| |
| def run(self): |
| badOnes = 0 |
| while len(self.jobs) > 0 and badOnes <= 50: |
| self.block.acquire() |
| try: |
| job = self.jobs.pop(0) |
| except Exception as err: |
| self.block.release() |
| return |
| if not job: |
| self.block.release() |
| return |
| self.block.release() |
| if not scanJob(self.KibbleBit, self.source, job, self.creds): |
| self.KibbleBit.pprint("[%s] This borked, trying another one" % job['name']) |
| badOnes += 1 |
| if badOnes > 10: |
| self.KibbleBit.pprint("Too many errors, bailing!") |
| self.source['steps']['forum'] = { |
| 'time': time.time(), |
| 'status': 'Too many errors while parsing at ' + time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())), |
| 'running': False, |
| 'good': False |
| } |
| self.KibbleBit.updateSource(self.source) |
| return |
| else: |
| badOnes = 0 |
| |
| def scan(KibbleBit, source): |
| # Simple URL check |
| discourse = re.match(r"(https?://.+)", source['sourceURL']) |
| if discourse: |
| |
| source['steps']['forum'] = { |
| 'time': time.time(), |
| 'status': 'Parsing Discourse topics...', |
| 'running': True, |
| 'good': True |
| } |
| KibbleBit.updateSource(source) |
| |
| badOnes = 0 |
| pendingJobs = [] |
| KibbleBit.pprint("Parsing Discourse activity at %s" % source['sourceURL']) |
| source['steps']['forum'] = { |
| 'time': time.time(), |
| 'status': 'Downloading changeset', |
| 'running': True, |
| 'good': True |
| } |
| KibbleBit.updateSource(source) |
| |
| # Discourse may neeed credentials (if basic auth) |
| creds = None |
| if source['creds'] and 'username' in source['creds'] and source['creds']['username'] and len(source['creds']['username']) > 0: |
| creds = "%s:%s" % (source['creds']['username'], source['creds']['password']) |
| |
| # Get the list of categories |
| sURL = source['sourceURL'] |
| KibbleBit.pprint("Getting categories...") |
| catjs = plugins.utils.jsonapi.get("%s/categories_and_latest" % sURL , auth = creds) |
| |
| # Directly assign the category list as pending jobs queue, ezpz. |
| pendingJobs = catjs['category_list']['categories'] |
| |
| KibbleBit.pprint("Found %u categories" % len(pendingJobs)) |
| |
| # Now fire off 4 threads to parse the categories |
| threads = [] |
| block = threading.Lock() |
| KibbleBit.pprint("Scanning jobs using 4 sub-threads") |
| for i in range(0,4): |
| t = discourseThread(block, KibbleBit, source, creds, pendingJobs) |
| threads.append(t) |
| t.start() |
| |
| for t in threads: |
| t.join() |
| |
| # We're all done, yaay |
| KibbleBit.pprint("Done scanning %s" % source['sourceURL']) |
| |
| source['steps']['forum'] = { |
| 'time': time.time(), |
| 'status': 'Discourse successfully scanned at ' + time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())), |
| 'running': False, |
| 'good': True |
| } |
| KibbleBit.updateSource(source) |
| |