blob: 4913970936e7c4cc20214c0adbb457206901d9fe [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from pylons import request
from pylons import tmpl_context as c
from allura.lib import helpers as h
from allura.lib import utils
from allura.lib.spam import SpamFilter
try:
import akismet
AKISMET_AVAILABLE = True
except ImportError:
AKISMET_AVAILABLE = False
log = logging.getLogger(__name__)
class AkismetSpamFilter(SpamFilter):
"""Spam checking implementation via Akismet service.
To enable Akismet spam filtering in your Allura instance, first
enable the entry point in setup.py::
[allura.spam]
akismet = allura.lib.spam.akismetfilter:AkismetSpamFilter
Then include the following parameters in your .ini file::
spam.method = akismet
spam.key = <your Akismet key here>
"""
def __init__(self, config):
if not AKISMET_AVAILABLE:
raise ImportError('akismet not available')
self.service = akismet.Akismet(
config.get('spam.key'), config.get('base_url'))
self.service.verify_key()
def get_data(self, text, artifact=None, user=None, content_type='comment', **kw):
kw['comment_content'] = text
kw['comment_type'] = content_type
if artifact:
kw['permalink'] = artifact.url()
user = user or c.user
if user:
kw['comment_author'] = user.display_name or user.username
kw['comment_author_email'] = user.email_addresses[
0] if user.email_addresses else ''
kw['user_ip'] = utils.ip_address(request)
kw['user_agent'] = request.headers.get('USER_AGENT')
kw['referrer'] = request.headers.get('REFERER')
# kw will be urlencoded, need to utf8-encode
for k, v in kw.items():
kw[k] = h.really_unicode(v).encode('utf8')
return kw
def check(self, text, artifact=None, user=None, content_type='comment', **kw):
log_msg = text
res = self.service.comment_check(text,
data=self.get_data(text=text,
artifact=artifact,
user=user,
content_type=content_type),
build_data=False)
log.info("spam=%s (akismet): %s" % (str(res), log_msg))
return res
def submit_spam(self, text, artifact=None, user=None, content_type='comment'):
self.service.submit_spam(text,
data=self.get_data(text=text,
artifact=artifact,
user=user,
content_type=content_type),
build_data=False)
def submit_ham(self, text, artifact=None, user=None, content_type='comment'):
self.service.submit_ham(text,
data=self.get_data(text=text,
artifact=artifact,
user=user,
content_type=content_type),
build_data=False)