blob: 1987e4b7a39e5988a5883d8f3558210656e432bd [file] [log] [blame]
"""Django Airavata Auth Backends: KeycloakBackend."""
import logging
import os
import time
import requests
from django.conf import settings
from django.contrib.auth.models import User
from django.views.decorators.debug import sensitive_variables
from oauthlib.oauth2 import InvalidGrantError, LegacyApplicationClient
from requests_oauthlib import OAuth2Session
from django_airavata.apps.auth.utils import get_authz_token
from . import models, utils
logger = logging.getLogger(__name__)
class KeycloakBackend(object):
"""Django authentication backend for Keycloak."""
# mask all local variables from error emails since they contain the user's
# password and/or client_secret. Note, we could selectively just hide
# variables that are sensitive, but this decorator doesn't apply explicitly
# listed variable masking to library function calls
@sensitive_variables()
def authenticate(self,
request=None,
username=None,
password=None,
refresh_token=None,
idp_alias=None):
try:
user = None
access_token = None
if username and password:
token, userinfo = self._get_token_and_userinfo_password_flow(
username, password)
if token is None: # login failed
return None
self._process_token(request, token)
user = self._process_userinfo(request, userinfo)
access_token = token['access_token']
elif 'HTTP_AUTHORIZATION' in request.META:
bearer, token = request.META.get('HTTP_AUTHORIZATION').split()
if bearer != "Bearer":
raise Exception("Unexpected Authorization header")
# implicitly validate token by using it to get userinfo
userinfo = self._get_userinfo_from_token(request, token)
user = self._process_userinfo(request, userinfo)
access_token = token
# user is already logged in and can use refresh token
elif request.user.is_authenticated and not utils.is_refresh_token_expired(request):
logger.debug("Refreshing token...")
token, userinfo = \
self._get_token_and_userinfo_from_refresh_token(request)
if token is None: # refresh failed
return None
self._process_token(request, token)
# user is already logged in
user = request.user
access_token = token['access_token']
elif refresh_token:
logger.debug("Refreshing supplied token...")
token, userinfo = \
self._get_token_and_userinfo_from_refresh_token(
request, refresh_token=refresh_token)
if token is None: # refresh failed
return None
self._process_token(request, token)
user = self._process_userinfo(request, userinfo)
access_token = token['access_token']
else:
token, userinfo = self._get_token_and_userinfo_redirect_flow(
request)
self._process_token(request, token)
user = self._process_userinfo(request, userinfo)
if idp_alias is not None:
self._store_idp_userinfo(user, token, idp_alias)
self._check_username_initialization(request, user)
access_token = token['access_token']
# authz_token_middleware has already run, so must manually add
# the `request.authz_token` attribute
if user is not None:
request.authz_token = get_authz_token(
request, user=user, access_token=access_token)
return user
except Exception as e:
logger.warning("login failed", exc_info=e)
raise
def get_user(self, user_id):
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
def _get_token_and_userinfo_password_flow(self, username, password):
try:
client_id = settings.KEYCLOAK_CLIENT_ID
client_secret = settings.KEYCLOAK_CLIENT_SECRET
token_url = settings.KEYCLOAK_TOKEN_URL
userinfo_url = settings.KEYCLOAK_USERINFO_URL
verify_ssl = settings.KEYCLOAK_VERIFY_SSL
oauth2_session = OAuth2Session(client=LegacyApplicationClient(
client_id=client_id))
verify = verify_ssl
if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'):
verify = settings.KEYCLOAK_CA_CERTFILE
token = oauth2_session.fetch_token(token_url=token_url,
username=username,
password=password,
client_id=client_id,
client_secret=client_secret,
verify=verify)
userinfo = oauth2_session.get(userinfo_url).json()
return token, userinfo
except InvalidGrantError as e:
# password wasn't valid, just log as a warning
logger.warning(f"Failed to log in user {username} with "
f"password: {e}")
return None, None
def _get_token_and_userinfo_redirect_flow(self, request):
authorization_code_url = request.build_absolute_uri()
client_id = settings.KEYCLOAK_CLIENT_ID
client_secret = settings.KEYCLOAK_CLIENT_SECRET
token_url = settings.KEYCLOAK_TOKEN_URL
userinfo_url = settings.KEYCLOAK_USERINFO_URL
verify_ssl = settings.KEYCLOAK_VERIFY_SSL
state = request.session['OAUTH2_STATE']
redirect_uri = request.session['OAUTH2_REDIRECT_URI']
logger.debug("state={}".format(state))
oauth2_session = OAuth2Session(client_id,
scope='openid',
redirect_uri=redirect_uri,
state=state)
verify = verify_ssl
if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'):
verify = settings.KEYCLOAK_CA_CERTFILE
if not request.is_secure() and settings.DEBUG and not os.environ.get('OAUTHLIB_INSECURE_TRANSPORT'):
# For local development (DEBUG=True), allow insecure OAuth redirect flow
# if OAUTHLIB_INSECURE_TRANSPORT isn't already set
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = "1"
logger.info("Adding env var OAUTHLIB_INSECURE_TRANSPORT=1 to allow "
"OAuth redirect flow even though request is not secure")
token = oauth2_session.fetch_token(
token_url, client_secret=client_secret,
authorization_response=authorization_code_url, verify=verify)
userinfo = oauth2_session.get(userinfo_url).json()
return token, userinfo
def _get_token_and_userinfo_from_refresh_token(self,
request,
refresh_token=None):
client_id = settings.KEYCLOAK_CLIENT_ID
client_secret = settings.KEYCLOAK_CLIENT_SECRET
token_url = settings.KEYCLOAK_TOKEN_URL
userinfo_url = settings.KEYCLOAK_USERINFO_URL
verify_ssl = settings.KEYCLOAK_VERIFY_SSL
oauth2_session = OAuth2Session(client_id, scope='openid')
verify = verify_ssl
if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'):
verify = settings.KEYCLOAK_CA_CERTFILE
refresh_token_ = (refresh_token
if refresh_token is not None
else request.session['REFRESH_TOKEN'])
# refresh_token doesn't take client_secret kwarg, so create auth
# explicitly
auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
try:
token = oauth2_session.refresh_token(token_url=token_url,
refresh_token=refresh_token_,
auth=auth,
verify=verify)
userinfo = oauth2_session.get(userinfo_url).json()
return token, userinfo
except InvalidGrantError as e:
# probably session was terminated by admin or by user logging out in another client
logger.warning(f"Failed to refresh token for user {request.user.username} "
f": {e}")
return None, None
def _get_userinfo_from_token(self, request, token):
client_id = settings.KEYCLOAK_CLIENT_ID
userinfo_url = settings.KEYCLOAK_USERINFO_URL
verify_ssl = settings.KEYCLOAK_VERIFY_SSL
oauth2_session = OAuth2Session(
client_id, token={'access_token': token})
verify = verify_ssl
if verify_ssl and hasattr(settings, 'KEYCLOAK_CA_CERTFILE'):
verify = settings.KEYCLOAK_CA_CERTFILE
userinfo = oauth2_session.get(
userinfo_url, verify=verify).json()
if 'error' in userinfo:
msg = userinfo.get('error_description')
if msg is None:
msg = f"Error fetching userinfo: {userinfo['error']}"
raise Exception(msg)
return userinfo
def _process_token(self, request, token):
# TODO validate the JWS signature
logger.debug("token: {}".format(token))
now = time.time()
# Put access_token into session to be used for authenticating with API
# server
sess = request.session
sess['ACCESS_TOKEN'] = token['access_token']
sess['ACCESS_TOKEN_EXPIRES_AT'] = now + token['expires_in']
sess['REFRESH_TOKEN'] = token['refresh_token']
sess['REFRESH_TOKEN_EXPIRES_AT'] = now + token['refresh_expires_in']
def _process_userinfo(self, request, userinfo):
logger.debug("userinfo: {}".format(userinfo))
sub = userinfo['sub']
username = userinfo['preferred_username']
email = userinfo.get('email', '')
first_name = userinfo.get('given_name', None)
last_name = userinfo.get('family_name', None)
user = self._get_or_create_user(sub, username)
user_profile = user.user_profile
# Save the user info claims
for (claim, value) in userinfo.items():
if user_profile.userinfo_set.filter(claim=claim).exists():
userinfo_claim = user_profile.userinfo_set.get(claim=claim)
userinfo_claim.value = value
userinfo_claim.save()
else:
user_profile.userinfo_set.create(claim=claim, value=value)
# Update User model fields
user = user_profile.user
user.username = username
user.email = email
user.first_name = first_name
user.last_name = last_name
user.save()
return user
def _get_or_create_user(self, sub, username):
try:
user_profile = models.UserProfile.objects.get(
userinfo__claim='sub', userinfo__value=sub)
return user_profile.user
except models.UserProfile.DoesNotExist:
try:
# For backwards compatibility, lookup by username
user = User.objects.get(username=username)
# Make sure there is a user_profile with the sub claim, which
# will be used to do the lookup next time
if not hasattr(user, 'user_profile'):
user_profile = models.UserProfile(user=user)
user_profile.save()
user_profile.userinfo_set.create(
claim='sub', value=sub)
else:
userinfo = user.user_profile.userinfo_set.get(claim='sub')
logger.warning(
f"User {username} exists but sub claims don't match: "
f"old={userinfo.value}, new={sub}. Updating to new "
"sub claim.")
userinfo.value = sub
userinfo.save()
return user
except User.DoesNotExist:
user = User(username=username)
user.save()
user_profile = models.UserProfile(user=user)
user_profile.save()
user_profile.userinfo_set.create(claim='sub', value=sub)
return user
def _store_idp_userinfo(self, user, token, idp_alias):
try:
idp_token_url = None
userinfo_url = None
for auth_option in settings.AUTHENTICATION_OPTIONS['external']:
if auth_option['idp_alias'] == idp_alias:
idp_token_url = auth_option.get('idp_token_url')
userinfo_url = auth_option.get('userinfo_url')
break
if idp_token_url is None or userinfo_url is None:
logger.debug(f"idp_token_url and/or userinfo_url not set for {idp_alias} "
"in AUTHENTICATION_OPTIONS, skipping retrieval of external IDP userinfo")
return
access_token = token['access_token']
logger.debug(f"access_token={access_token} for idp_alias={idp_alias}")
# fetch the idp's token
headers = {'Authorization': f'Bearer {access_token}'}
# For the following to work, in Keycloak the IDP should have 'Store
# Tokens' and 'Stored Tokens Readable' enabled and the user needs
# the broker/read-token role
r = requests.get(idp_token_url, headers=headers)
idp_token = r.json()
idp_headers = {'Authorization': f"Bearer {idp_token['access_token']}"}
r = requests.get(userinfo_url, headers=idp_headers)
userinfo = r.json()
logger.debug(f"userinfo={userinfo}")
# Save the idp user info claims
user_profile = user.user_profile
for (claim, value) in userinfo.items():
if user_profile.idp_userinfo.filter(idp_alias=idp_alias, claim=claim).exists():
userinfo_claim = user_profile.idp_userinfo.get(idp_alias=idp_alias, claim=claim)
userinfo_claim.value = value
userinfo_claim.save()
else:
user_profile.idp_userinfo.create(idp_alias=idp_alias, claim=claim, value=value)
except Exception:
logger.exception(f"Failed to store IDP userinfo for {user.username} from IDP {idp_alias}")
def _check_username_initialization(self, request, user):
# Check if the username assigned to the user was based on the user's
# email address or if it was assigned some random string (Keycloak's
# sub). If the latter, we'll want to alert the admins so that they can
# assign a proper username for the user.
user_profile = user.user_profile
if (not user_profile.username_initialized and
user_profile.userinfo_set.filter(claim='email').exists() and
user_profile.userinfo_set.filter(claim='preferred_username').exists() and
user_profile.userinfo_set.get(claim='email').value == user_profile.userinfo_set.get(claim='preferred_username').value):
user_profile.username_initialized = True
user_profile.save()
# TODO: also check idp_userinfo.preferred_username if it exists
if not user_profile.username_initialized and not user_profile.is_username_valid:
try:
utils.send_admin_alert_about_uninitialized_username(
request, user.username, user.email, user.first_name, user.last_name)
except Exception:
logger.exception(f"Failed to send alert about username being uninitialized: {user.username}", extra={'request': request})