| import logging |
| import json |
| from time import timezone |
| from urllib.parse import quote |
| from datetime import datetime, timedelta, timezone |
| |
| |
| from django.conf import settings |
| from django.contrib import messages |
| from django.contrib.auth import authenticate, login, logout |
| from django.core.exceptions import ValidationError, ObjectDoesNotExist |
| from django.forms import formset_factory, models |
| from django.http import HttpResponseBadRequest |
| from django.shortcuts import render, redirect, resolve_url |
| from django.template import Context |
| from django.urls import reverse |
| from django.utils.http import urlencode |
| from django.views.decorators.debug import sensitive_variables |
| from google.protobuf.json_format import MessageToDict |
| from requests_oauthlib import OAuth2Session |
| from google.auth import jwt |
| |
| from . import utils |
| from . import models |
| from . import forms |
| from ... import identity_management_client |
| from ... import user_management_client |
| from . import backends |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def callback(request): |
| try: |
| print("***Request**", request) |
| user = authenticate(request=request) |
| logger.debug("Saving user to session: {}".format(user)) |
| login(request, user) |
| sess = request.session |
| print(sess['ACCESS_TOKEN']) |
| token = sess['ACCESS_TOKEN'] |
| print("****User****",user) |
| code = request.GET.get('code').strip() |
| logger.info(code) |
| redirect_uri = request.build_absolute_uri(reverse('custos_portal_auth:callback')) |
| print("*****redirect uri******",redirect_uri) |
| # response = identity_management_client.token(token="Y3VzdG9zLTZud29xb2RzdHBlNW12Y3EwOWxoLTEwMDAwMTAxOkdpS3JHR1ZMVzd6RG9QWnd6Z0NpRk03V1V6M1BoSXVtVG1GeEFrcjc=", redirect_uri="http://127.0.0.1:8000/auth/callback/" |
| # , code=code,grant_type="authorization_code") |
| # print("*********response******", response) |
| print(token) |
| # return _handle_login_redirect(request) |
| # response = identity_management_client.authenticate(settings.CUSTOS_TOKEN, username, password) |
| # token = MessageToDict(response)["accessToken"] |
| response = _handle_login_redirect(request) |
| response.set_cookie('token', value=token, secure=False, httponly=True) |
| return response |
| |
| except Exception as err: |
| logger.exception("An error occurred while processing OAuth2 " |
| "callback: {}".format(request.build_absolute_uri())) |
| idp_alias = "cilogon" |
| return redirect(reverse('custos_portal_auth:callback-error', |
| args=(idp_alias,))) |
| |
| |
| def callback_error(request, idp_alias): |
| _validate_idp_alias(idp_alias) |
| # Create a filtered options object with just the given idp_alias |
| options = { |
| 'external': [] |
| } |
| for ext in settings.AUTHENTICATION_OPTIONS['external']: |
| if ext['idp_alias'] == idp_alias: |
| options['external'].append(ext.copy()) |
| |
| return render(request, 'custos_portal_auth/callback-error.html', { |
| 'idp_alias': idp_alias, |
| 'options': options, |
| }) |
| |
| |
| # def create_account(request): |
| # if request.method == 'POST': |
| # form = forms.CreateAccountForm(request.POST) |
| # if form.is_valid(): |
| # try: |
| # username = form.cleaned_data['username'] |
| # email = form.cleaned_data['email'] |
| # first_name = form.cleaned_data['first_name'] |
| # last_name = form.cleaned_data['last_name'] |
| # password = form.cleaned_data['password'] |
| # is_temp_password = False |
| # result = user_management_client.register_user(settings.CUSTOS_TOKEN, username, first_name, last_name, |
| # password, email, is_temp_password) |
| # if result.is_registered: |
| # logger.debug("User account successfully created for : {}".format(username)) |
| # _create_and_send_email_verification_link(request, username, email, first_name, last_name, |
| # settings.LOGIN_URL) |
| # messages.success( |
| # request, |
| # "Account request processed successfully. Before you " |
| # "can login you need to confirm your email address. " |
| # "We've sent you an email with a link that you should " |
| # "click on to complete the account creation process.") |
| # else: |
| # form.add_error(None, ValidationError("Failed to register the user with IAM service")) |
| # except TypeError as e: |
| # logger.exception( |
| # "Failed to create account for user", exc_info=e) |
| # form.add_error(None, ValidationError(e)) |
| # return render(request, 'custos_portal_auth/create_account.html', { |
| # 'options': settings.AUTHENTICATION_OPTIONS, |
| # 'form': form |
| # }) |
| # else: |
| # form = forms.CreateAccountForm() |
| # return render(request, 'custos_portal_auth/create_account.html', { |
| # 'options': settings.AUTHENTICATION_OPTIONS, |
| # 'form': form |
| # }) |
| |
| def create_account(request): |
| return render(request, 'custos_portal_auth/form_create_account.html', { |
| 'next': request.GET.get('next', None), |
| 'options': settings.AUTHENTICATION_OPTIONS, |
| }) |
| |
| |
| def forgot_password(request): |
| if request.method == 'POST': |
| form = forms.ForgotPasswordForm(request.POST) |
| if form.is_valid(): |
| try: |
| username = form.cleaned_data['username'] |
| if not user_management_client.is_username_available(settings.CUSTOS_TOKEN, username).is_exist: |
| if not user_management_client.is_user_enabled(settings.CUSTOS_TOKEN, username).is_exist: |
| messages.error( |
| request, |
| "Please finish creating your account before " |
| "resetting your password. Provide your username " |
| "below and we will send you another email " |
| "verification link.") |
| return redirect( |
| reverse('custos_portal_auth:resend_email_link')) |
| _create_and_send_password_reset_request_link(request, username) |
| # Always display this message even if you doesn't exist. Don't |
| # reveal whether a user with that username exists. |
| messages.success( |
| request, |
| "Reset password request processed successfully. We've " |
| "sent an email with a password reset link to the email " |
| "address associated with the username you provided. You " |
| "can use that link within the next 24 hours to set a new " |
| "password.") |
| return redirect( |
| reverse('custos_portal_auth:forgot_password')) |
| except Exception as e: |
| logger.exception( |
| "Failed to generate password reset request for user", |
| exc_info=e) |
| form.add_error(None, ValidationError(str(e))) |
| else: |
| form = forms.ForgotPasswordForm() |
| return render(request, 'custos_portal_auth/forgot_password.html', { |
| 'form': form |
| }) |
| |
| |
| def _create_and_send_password_reset_request_link(request, username): |
| password_reset_request = models.PasswordResetRequest(username=username) |
| password_reset_request.save() |
| |
| verification_uri = request.build_absolute_uri( |
| reverse( |
| 'custos_portal_auth:reset_password', kwargs={ |
| 'code': password_reset_request.reset_code})) |
| logger.debug( |
| "password reset verification_uri={}".format(verification_uri)) |
| |
| user = user_management_client.get_user(settings.CUSTOS_TOKEN, username) |
| context = Context({ |
| "username": username, |
| "email": user.email, |
| "first_name": user.first_name, |
| "last_name": user.last_name, |
| "portal_title": settings.PORTAL_TITLE, |
| "url": verification_uri, |
| }) |
| utils.send_email_to_user(models.PASSWORD_RESET_EMAIL_TEMPLATE, context) |
| |
| |
| def handle_login(request): |
| username = request.POST['username'] |
| password = request.POST['password'] |
| login_type = request.POST.get('login_type', None) |
| template = "custos_portal_auth/login.html" |
| if login_type and login_type == 'password': |
| template = "custos_portal_auth/login_username_password.html" |
| user = authenticate(username=username, password=password, request=request) |
| logger.debug("authenticated user: {}".format(user)) |
| try: |
| if user is not None: |
| login(request, user) |
| response = identity_management_client.authenticate(settings.CUSTOS_TOKEN, username, password) |
| token = MessageToDict(response)["accessToken"] |
| response = _handle_login_redirect(request) |
| response.set_cookie('token', value=token, secure=False, httponly=True) |
| return response |
| else: |
| messages.error(request, "Login failed. Please try again.") |
| except Exception as err: |
| messages.error(request, |
| "Login failed: {}. Please try again.".format(str(err))) |
| return render(request, template, { |
| 'username': username, |
| 'next': request.POST.get('next', None), |
| 'options': settings.AUTHENTICATION_OPTIONS, |
| 'login_type': login_type, |
| }) |
| |
| |
| def redirect_login(request, idp_alias): |
| _validate_idp_alias(idp_alias) |
| |
| client_id = settings.KEYCLOAK_CLIENT_ID |
| |
| auth_base_url = identity_management_client.get_oidc_configuration(settings.CUSTOS_TOKEN, client_id) |
| # auth_base_url = json.loads(auth_base_url.) |
| # print(auth_base_url) |
| base_authorize_url = settings.KEYCLOAK_AUTHORIZE_URL |
| |
| redirect_uri = request.build_absolute_uri( |
| reverse('custos_portal_auth:callback')) |
| |
| oauth2_session = OAuth2Session( |
| client_id, scope='openid', redirect_uri=redirect_uri) |
| authorization_url, state = oauth2_session.authorization_url( |
| base_authorize_url) |
| authorization_url += '&kc_idp_hint=' + quote("oidc") |
| |
| # Store state in session for later validation (see backends.py) |
| |
| request.session['OAUTH2_STATE'] = state |
| request.session['OAUTH2_REDIRECT_URI'] = redirect_uri |
| |
| logger.debug('Redirect URI: {}'.format(redirect_uri)) |
| logger.debug('Authorization URL for OpenID: {}'.format(authorization_url)) |
| return redirect(authorization_url) |
| |
| |
| def resend_email_link(request): |
| if request.method == 'POST': |
| form = forms.ResendEmailVerificationLinkForm(request.POST) |
| if form.is_valid(): |
| try: |
| username = form.cleaned_data['username'] |
| if not user_management_client.is_username_available(settings.CUSTOS_TOKEN, username).is_exist: |
| user_profile = user_management_client.get_user(settings.CUSTOS_TOKEN, username) |
| print(user_profile) |
| _create_and_send_email_verification_link( |
| request, |
| username, |
| user_profile.email, |
| user_profile.first_name, |
| user_profile.last_name, |
| settings.LOGIN_URL) |
| messages.success( |
| request, |
| "Email verification link sent successfully. Please " |
| "click on the link in the email that we sent " |
| "to your email address.") |
| else: |
| messages.error( |
| request, |
| "Unable to resend email verification link. Please " |
| "contact the website administrator for further " |
| "assistance.") |
| return redirect( |
| reverse('custos_portal_auth:resend_email_link')) |
| except Exception as e: |
| logger.exception( |
| "Failed to resend email verification link", exc_info=e) |
| form.add_error(None, ValidationError(str(e))) |
| else: |
| form = forms.ResendEmailVerificationLinkForm() |
| return render(request, 'custos_portal_auth/verify_email.html', { |
| 'form': form |
| }) |
| |
| |
| @sensitive_variables('password') |
| def reset_password(request, code): |
| try: |
| password_reset_request = models.PasswordResetRequest.objects.get( |
| reset_code=code) |
| except ObjectDoesNotExist as e: |
| messages.error( |
| request, |
| "Reset password link is invalid. Please try again.") |
| return redirect(reverse('custos_portal_auth:forgot_password')) |
| |
| now = datetime.now(timezone.utc) |
| if now - password_reset_request.created_date > timedelta(days=1): |
| password_reset_request.delete() |
| messages.error( |
| request, |
| "Reset password link has expired. Please try again.") |
| return redirect(reverse('custos_portal_auth:forgot_password')) |
| |
| if request.method == "POST": |
| form = forms.ResetPasswordForm(request.POST) |
| if form.is_valid(): |
| try: |
| password = form.cleaned_data['password'] |
| # TODO Password is not updating properly |
| success = user_management_client.reset_password(settings.CUSTOS_TOKEN, password_reset_request.username, |
| password) |
| logger.debug("Password reset result: {}".format(success)) |
| if not success: |
| messages.error( |
| request, "Failed to reset password. Please try again.") |
| return redirect( |
| reverse('custos_portal_auth:forgot_password')) |
| else: |
| password_reset_request.delete() |
| messages.success( |
| request, |
| "You may now log in with your new password.") |
| return redirect( |
| reverse('custos_portal_auth:login_with_password')) |
| except Exception as e: |
| logger.exception( |
| "Failed to reset password for user", exc_info=e) |
| form.add_error(None, ValidationError(str(e))) |
| else: |
| form = forms.ResetPasswordForm() |
| return render(request, 'custos_portal_auth/reset_password.html', { |
| 'form': form, |
| 'code': code |
| }) |
| |
| |
| def start_login(request): |
| return render(request, 'custos_portal_auth/login.html', { |
| 'next': request.GET.get('next', None), |
| 'options': settings.AUTHENTICATION_OPTIONS, |
| }) |
| |
| |
| def start_logout(request): |
| logout(request) |
| redirect_url = request.build_absolute_uri(resolve_url(settings.LOGOUT_REDIRECT_URL)) |
| response = redirect(settings.KEYCLOAK_LOGOUT_URL + "?redirect_uri=" + quote(redirect_url)) |
| response.delete_cookie('token') |
| return response |
| |
| |
| def start_username_password_login(request): |
| # return bad request if password isn't a configured option |
| if 'password' not in settings.AUTHENTICATION_OPTIONS: |
| return HttpResponseBadRequest("Username/password login is not enabled") |
| return render(request, |
| 'custos_portal_auth/login_username_password.html', |
| { |
| 'next': request.GET.get('next', None), |
| 'options': settings.AUTHENTICATION_OPTIONS, |
| 'login_type': 'password' |
| }) |
| |
| |
| def verify_email(request, code): |
| try: |
| email_verification = models.EmailVerification.objects.get(verification_code=code) |
| email_verification.verified = True |
| email_verification.save() |
| # Check if user is enabled, if so redirect to login page |
| username = email_verification.username |
| logger.debug("Email address verified for {}".format(username)) |
| login_url = reverse('custos_portal_auth:login') |
| if email_verification.next: |
| login_url += "?" + urlencode({'next': email_verification.next}) |
| |
| print(user_management_client.is_user_enabled(settings.CUSTOS_TOKEN, "shivam_testing_3")) |
| if user_management_client.is_user_enabled(settings.CUSTOS_TOKEN, username).status: |
| logger.debug("User {} is already enabled".format(username)) |
| messages.success( |
| request, |
| "Your account has already been successfully created. " |
| "Please log in now.") |
| return redirect(login_url) |
| else: |
| logger.debug("Enabling user {}".format(username)) |
| # enable user and inform admins |
| user_profile = MessageToDict(user_management_client.enable_user(settings.CUSTOS_TOKEN, username)) |
| logger.debug(user_profile) |
| email_address = user_profile["email"] |
| first_name = user_profile["firstName"] |
| last_name = user_profile["lastName"] |
| utils.send_new_user_email(request, |
| username, |
| email_address, |
| first_name, |
| last_name) |
| messages.success( |
| request, |
| "Your account has been successfully created. " |
| "Please log in now.") |
| return redirect(login_url) |
| except ObjectDoesNotExist as e: |
| # if doesn't exist, give user a form where they can enter their |
| # username to resend verification code |
| logger.exception("EmailVerification object doesn't exist for " |
| "code {}".format(code)) |
| messages.error( |
| request, |
| "Email verification failed. Please enter your username and we " |
| "will send you another email verification link.") |
| return redirect(reverse('custos_portal_auth:resend_email_link')) |
| except Exception as e: |
| logger.exception("Email verification processing failed!") |
| messages.error( |
| request, |
| "Email verification failed. Please try clicking the email " |
| "verification link again later.") |
| return redirect(reverse('custos_portal_auth:create_account')) |
| |
| |
| def _create_and_send_email_verification_link(request, username, email, first_name, last_name, next_url=None): |
| email_verification = models.EmailVerification(username=username, next=next_url) |
| email_verification.save() |
| |
| verification_uri = request.build_absolute_uri( |
| reverse('custos_portal_auth:verify_email', kwargs={'code': email_verification.verification_code})) |
| logger.debug( |
| "verification_uri={}".format(verification_uri)) |
| |
| context = Context({ |
| "username": username, |
| "email": email, |
| "first_name": first_name, |
| "last_name": last_name, |
| "portal_title": settings.PORTAL_TITLE, |
| "url": verification_uri, |
| }) |
| utils.send_email_to_user(models.VERIFY_EMAIL_TEMPLATE, context) |
| |
| |
| def _validate_idp_alias(idp_alias): |
| external_auth_options = settings.AUTHENTICATION_OPTIONS['external'] |
| valid_idp_aliases = [ext['idp_alias'] for ext in external_auth_options] |
| if idp_alias not in valid_idp_aliases: |
| raise Exception("idp_alias is not valid: {}".format(idp_alias)) |
| |
| |
| def _handle_login_redirect(request): |
| if request.is_gateway_admin: |
| return redirect(reverse('custos_portal_admin:list_requests')) |
| else: |
| return redirect(reverse('custos_portal_workspace:list_requests')) |