blob: dcdb2cafdf2ffaae6b5de351f59dcaa943438cc9 [file] [log] [blame]
import json
import logging
import time
from pprint import pprint
from django.conf import settings
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import User
from django.views.decorators.debug import sensitive_variables
from google.auth import jwt
from google.protobuf.json_format import MessageToJson, MessageToDict
from custos_portal import identity_management_client
logger = logging.getLogger(__name__)
class CustosAuthBackend(ModelBackend):
"""Django authentication backend for custos admin portal"""
@sensitive_variables('password')
def authenticate(self, request=None, username=None, password=None, refresh_token=None):
try:
if username and password:
print("Entered username and password")
token = self._get_token_and_userinfo_password_flow(username, password)
request.session["ACCESS_TOKEN"] = token
userinfo = self._get_userinfo_from_token(token)
self._get_user_groups(request, token)
# user login using CIlogon
else:
token = self._get_token_and_userinfo_redirect_flow(request)
# the custos api returns different token responses for 'authenticate' and 'token' methods
userinfo = self._get_userinfo_from_token(token["access_token"])
self._process_token(request, token)
self._get_user_groups(request, token["access_token"])
return self._process_userinfo(request, userinfo)
except Exception as e:
logger.exception("login failed")
return None
def get_user(self, user_id):
try:
logger.debug("Checking for user: {}".format(user_id))
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
def _get_token_and_userinfo_password_flow(self, username, password):
response = identity_management_client.authenticate(settings.CUSTOS_TOKEN, username, password)
token = MessageToDict(response)["accessToken"]
logger.debug("Token: {}".format(token))
return token
def _get_token_and_userinfo_redirect_flow(self, request):
code = request.GET.get('code')
state = request.GET.get('state')
session_state = request.GET.get('session_state')
saved_state = request.session['OAUTH2_STATE']
redirect_uri = request.session['OAUTH2_REDIRECT_URI']
logger.debug("Code: {}, State: {}, Saved_state: {}, session_state: {}".format(code, state, saved_state,
session_state))
if state == saved_state:
response = identity_management_client.token(settings.CUSTOS_TOKEN, redirect_uri, code)
logger.debug(response)
token = MessageToDict(response)
logger.debug(token["access_token"])
return token
return
def _process_token(self, request, token):
# TODO validate the JWS signature
now = time.time()
# Put access_token into session to be used for authenticating with API
# server
sess = request.session
sess['ACCESS_TOKEN'] = token['access_token']
sess['ACCESS_TOKEN_EXPIRES_AT'] = now + token['expires_in']
sess['REFRESH_TOKEN'] = token['refresh_token']
sess['REFRESH_TOKEN_EXPIRES_AT'] = now + token['refresh_expires_in']
def _get_userinfo_from_token(self, token):
userinfo = {}
decoded_id_token = jwt.decode(token, verify=False)
userinfo["username"] = decoded_id_token["preferred_username"]
userinfo["first_name"] = decoded_id_token["given_name"]
userinfo["last_name"] = decoded_id_token["family_name"]
userinfo["email"] = decoded_id_token["email"]
return userinfo
def _get_user_groups(self, request, access_token):
decoded_id_token = jwt.decode(access_token, verify=False)
user_groups = decoded_id_token["realm_access"]["roles"]
request.session["GATEWAY_GROUPS"] = user_groups
request.is_gateway_admin = 'admin' in user_groups
def _process_userinfo(self, request, userinfo):
logger.debug("Userinfo: {}".format(userinfo))
username = userinfo['username']
email = userinfo['email']
first_name = userinfo['first_name']
last_name = userinfo['last_name']
request.session['USERINFO'] = userinfo
try:
user = User.objects.get(username=username)
# Update these fields each time, in case they have changed
user.email = email
user.first_name = first_name
user.last_name = last_name
logger.debug("User already exists, updating it now")
# Save the user locally in Django database
user.save()
except User.DoesNotExist:
user = User(username=username,
first_name=first_name,
last_name=last_name,
email=email)
logger.debug("User does not already exists, adding it now")
user.save()
return user