blob: c7e2f633546a07cc726e60484700f3a8d0a5158e [file] [log] [blame]
import logging
from pylons import g, request
from tg import flash, redirect, session, config
from openid.consumer import consumer
from allura import model as M
log = logging.getLogger(__name__)
# openid.oidutil outputs all logging to STDERR unless otherwise configured.
# We follow the openid.oidutil instructions to install our own logging hook.
from openid import oidutil
oidutil.log = log.info
def verify_oid(oid_url, failure_redirect=None, return_to=None,
**kw):
'''Step 1 of OID verification -- redirect to provider site'''
log.info('Trying to login via %s', oid_url)
realm = config.get('openid.realm', 'http://localhost:8080/')
return_to = realm + 'auth/' + return_to
oidconsumer = consumer.Consumer(g.oid_session(), g.oid_store)
try:
req = oidconsumer.begin(oid_url)
except consumer.DiscoveryFailure, ex:
log.exception('Error in openid login')
flash(str(ex[0]), 'error')
redirect(failure_redirect)
if req is None: # pragma no cover
flash('No openid services found for <code>%s</code>' % oid_url,
'error')
redirect(failure_redirect)
if req.shouldSendRedirect():
redirect_url = req.redirectURL(
realm, return_to, False)
log.info('Redirecting to %r', redirect_url)
session.save()
redirect(redirect_url)
else:
return dict(kw, form=req.formMarkup(realm, return_to=return_to))
def process_oid(failure_redirect=None):
oidconsumer = consumer.Consumer(g.oid_session(), g.oid_store)
info = oidconsumer.complete(request.params, request.url)
display_identifier = info.getDisplayIdentifier() or info.identity_url
if info.status == consumer.FAILURE and display_identifier:
# In the case of failure, if info is non-None, it is the
# URL that we were verifying. We include it in the error
# message to help the user figure out what happened.
fmt = "Verification of %s failed: %s"
flash(fmt % (display_identifier, info.message), 'error')
redirect(failure_redirect)
elif info.status == consumer.SUCCESS:
# Success means that the transaction completed without
# error. If info is None, it means that the user cancelled
# the verification.
css_class = 'alert'
# This is a successful verification attempt. If this
# was a real application, we would do our login,
# comment posting, etc. here.
fmt = "You have successfully verified %s as your identity."
message = fmt % display_identifier
if info.endpoint.canonicalID:
# You should authorize i-name users by their canonicalID,
# rather than their more human-friendly identifiers. That
# way their account with you is not compromised if their
# i-name registration expires and is bought by someone else.
message += (" This is an i-name, and its persistent ID is %s"
% info.endpoint.canonicalID )
flash(message, 'info')
elif info.status == consumer.CANCEL:
# cancelled
message = 'Verification cancelled'
flash(message, 'error')
redirect(failure_redirect)
elif info.status == consumer.SETUP_NEEDED:
if info.setup_url:
message = '<a href=%s>Setup needed</a>' % info.setup_url
else:
# This means auth didn't succeed, but you're welcome to try
# non-immediate mode.
message = 'Setup needed'
flash(message, 'error')
redirect(failure_redirect)
else:
# Either we don't understand the code or there is no
# openid_url included with the error. Give a generic
# failure message. The library should supply debug
# information in a log.
message = 'Verification failed.'
flash(message, 'error')
redirect(failure_redirect)
session.save()
oid_obj = M.OpenId.upsert(info.identity_url, display_identifier=display_identifier)
return oid_obj