blob: 42baf1e04602aa789dea6ca1a1d51b20e4322f13 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
#the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import datetime
import re
import json
import hashlib
import plugins.utils.jsonapi
import threading
import requests.exceptions
import os
This is the Kibble Discourse scanner plugin.
title = "Scanner for Discourse Forums"
version = "0.1.0"
def accepts(source):
""" Determines whether we want to handle this source """
if source['type'] == 'discourse':
return True
return False
def scanJob(KibbleBit, source, cat, creds):
""" Scans a single discourse category for activity """
NOW = int(datetime.datetime.utcnow().timestamp())
#dhash = hashlib.sha224( ("%s-%s-%s" % (source['organisation'], source['sourceURL'], job['name']) ).encode('ascii', errors='replace')).hexdigest()
# Get $discourseURL/c/$catID
catURL = os.path.join(source['sourceURL'], "c/%s" % cat['id'])
KibbleBit.pprint("Scanning Discourse category '%s' at %s" % (cat['slug'], catURL))
page = 1
allUsers = {}
# For each paginated result (up to page 100), check for changes
while page < 100:
pcatURL = "%s?page=%u" % (catURL, page)
catjson = plugins.utils.jsonapi.get(pcatURL, auth = creds)
page += 1
if catjson:
# If we hit an empty list (no more topics), just break the loop.
if not catjson['topic_list']['topics']:
# First (if we have data), we should store the known users
# Since discourse hides the email (obviously!), we'll have to
# fake one to generate an account.
fakeDomain = "foo.discourse"
m = re.match(r"https?://([-a-zA-Z0-9.]+)", source['sourceURL'])
if m:
fakeDomain =
for user in catjson['users']:
# Fake email address, compute deterministic ID
email = "%s@%s" % (user['username'], fakeDomain)
dhash = hashlib.sha224( ("%s-%s-%s" % (source['organisation'], source['sourceURL'], email) ).encode('ascii', errors='replace')).hexdigest()
# Construct a very sparse user document
userDoc = {
'id': dhash,
'organisation': source['organisation'],
'name': user['username'],
'email': email,
# Store user-ID-to-username mapping for later
allUsers[user['id']] = userDoc
# Store it (or, queue storage)
KibbleBit.append('person', userDoc)
# Now, for each topic, we'll store a topic document
for topic in catjson['topic_list']['topics']:
# Calculate topic ID
dhash = hashlib.sha224( ("%s-%s-topic-%s" % (source['organisation'], source['sourceURL'], topic['id']) ).encode('ascii', errors='replace')).hexdigest()
# Figure out when topic was created and updated
CreatedDate = datetime.datetime.strptime(topic['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
if topic.get('last_posted_at'):
UpdatedDate = datetime.datetime.strptime(topic['last_posted_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
UpdatedDate = 0
# Determine whether we should scan this topic or continue to the next one.
# We'll do this by seeing if the topic already exists and has no changes or not.
if KibbleBit.exists('forum_topic', dhash):
fdoc = KibbleBit.get('forum_topic', dhash)
# If update in the old doc was >= current update timestamp, skip the topic
if fdoc['updated'] >= UpdatedDate:
# Assuming we need to scan this, start by making the base topic document
topicdoc = {
'id': dhash,
'sourceID': source['sourceID'],
'organisation': source['organisation'],
'category': cat['slug'],
'title': topic['title'],
'creator': allUsers[topic['posters'][0]['user_id']]['id'],
'creatorName': allUsers[topic['posters'][0]['user_id']]['name'],
'created': CreatedDate,
'updated': UpdatedDate,
'solved': False, # Discourse doesn't have this notion, but other forums might.
'posts': topic['posts_count'],
'views': topic['views'],
'url': source['sourceURL'] + "/t/%s/%s" % (topic['slug'], topic['id'])
KibbleBit.append('forum_topic', topicdoc)
KibbleBit.pprint("%s is new or changed, scanning" % topicdoc['url'])
# Now grab all the individual replies/posts
# Remember to not have it count as a visit!
pURL = "%s?track_visit=false&forceLoad=true" % topicdoc['url']
pjson = plugins.utils.jsonapi.get(pURL, auth = creds)
posts = pjson['post_stream']['posts']
# For each post/reply, construct a forum_entry document
KibbleBit.pprint("%s has %u posts" % (pURL, len(posts)))
for post in posts:
phash = hashlib.sha224( ("%s-%s-post-%s" % (source['organisation'], source['sourceURL'], post['id']) ).encode('ascii', errors='replace')).hexdigest()
# Find the hash of the person who posted it
# We may know them, or we may have to store them
if post['user_id'] in allUsers:
uhash = allUsers[post['user_id']]['id']
# Same as before, fake email, store...
email = "%s@%s" % (post['username'], fakeDomain)
uhash = hashlib.sha224( ("%s-%s-%s" % (source['organisation'], source['sourceURL'], email) ).encode('ascii', errors='replace')).hexdigest()
# Construct a very sparse user document
userDoc = {
'id': uhash,
'organisation': source['organisation'],
'name': post['username'],
'email': email,
# Store user-ID-to-username mapping for later
allUsers[user['id']] = userDoc
# Store it (or, queue storage)
KibbleBit.append('person', userDoc)
# Get post date
CreatedDate = datetime.datetime.strptime(post['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
# Store the post/reply document
pdoc = {
'id': phash,
'sourceID': source['sourceID'],
'organisation': source['organisation'],
'created': CreatedDate,
'topic': dhash,
'post_id': post['id'],
'text': post['cooked'],
'url': topicdoc['url']
return True
# Boo, it failed!
KibbleBit.pprint("Fetching job data failed!")
return False
class discourseThread(threading.Thread):
""" Generic thread class for scheduling multiple scans at once """
def __init__(self, block, KibbleBit, source, creds, jobs):
super(discourseThread, self).__init__()
self.block = block
self.KibbleBit = KibbleBit
self.creds = creds
self.source = source = jobs
def run(self):
badOnes = 0
while len( > 0 and badOnes <= 50:
job =
except Exception as err:
if not job:
if not scanJob(self.KibbleBit, self.source, job, self.creds):
self.KibbleBit.pprint("[%s] This borked, trying another one" % job['name'])
badOnes += 1
if badOnes > 10:
self.KibbleBit.pprint("Too many errors, bailing!")
self.source['steps']['forum'] = {
'time': time.time(),
'status': 'Too many errors while parsing at ' + time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())),
'running': False,
'good': False
badOnes = 0
def scan(KibbleBit, source):
# Simple URL check
discourse = re.match(r"(https?://.+)", source['sourceURL'])
if discourse:
source['steps']['forum'] = {
'time': time.time(),
'status': 'Parsing Discourse topics...',
'running': True,
'good': True
badOnes = 0
pendingJobs = []
KibbleBit.pprint("Parsing Discourse activity at %s" % source['sourceURL'])
source['steps']['forum'] = {
'time': time.time(),
'status': 'Downloading changeset',
'running': True,
'good': True
# Discourse may neeed credentials (if basic auth)
creds = None
if source['creds'] and 'username' in source['creds'] and source['creds']['username'] and len(source['creds']['username']) > 0:
creds = "%s:%s" % (source['creds']['username'], source['creds']['password'])
# Get the list of categories
sURL = source['sourceURL']
KibbleBit.pprint("Getting categories...")
catjs = plugins.utils.jsonapi.get("%s/categories_and_latest" % sURL , auth = creds)
# Directly assign the category list as pending jobs queue, ezpz.
pendingJobs = catjs['category_list']['categories']
KibbleBit.pprint("Found %u categories" % len(pendingJobs))
# Now fire off 4 threads to parse the categories
threads = []
block = threading.Lock()
KibbleBit.pprint("Scanning jobs using 4 sub-threads")
for i in range(0,4):
t = discourseThread(block, KibbleBit, source, creds, pendingJobs)
for t in threads:
# We're all done, yaay
KibbleBit.pprint("Done scanning %s" % source['sourceURL'])
source['steps']['forum'] = {
'time': time.time(),
'status': 'Discourse successfully scanned at ' + time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())),
'running': False,
'good': True