blob: 6b30df73321071c4549296e0eca5bb1ffd6c397c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oauthlib.oauth2 import LegacyApplicationClient
import requests
import configparser
from airavata_custos.settings import IAMSettings
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
from custos.commons.model.security.ttypes import AuthzToken
from urllib.parse import quote
from airavata_custos.security.client_credentials import IdpCredentials, UserCredentials, ClientCredentials
class KeycloakBackend(object):
def __init__(self, configuration_file_location):
"""
constructor for KeycloakBackend class
:param configuration_file_location: takes the location of the ini file containing server configuration
"""
self.keycloak_settings = IAMSettings()
self._load_settings(configuration_file_location)
def authenticate_using_user_details(self, user_credentials):
"""
Method to authenticate a gateway user with keycloak
:param user_credentials: object of UserCredentials class. To get instance of this class use prepare_user_credentials
:return: openid token, openid user information
"""
try:
token, user_info = self._get_token_and_user_info_password_flow(user_credentials)
return token, user_info
except Exception as e:
return None
def prepare_user_credentials(self, client_id, client_secret, username, password):
"""
:param client_id: client identifier received after registering the tenant
:param client_secret: client password received after registering the tenant
:param username: username of the user which needs to be authenticated
:param password: password of the user which needs to be authenticated
:return: UserCredentials object
"""
return UserCredentials(client_id, client_secret, username, password)
def authenticate_using_idp(self, idp_credentials):
"""
:param idp_credentials: object of IdpCredentials class. To get an instance of this class use prepare_idp_credentials
:return: openid token, openid user information
"""
try:
token, user_info = self._get_token_and_user_info_redirect_flow(idp_credentials)
return token, user_info
except Exception as e:
return None
def prepare_idp_credentials(self, client_id, client_secret, redirect_uri, idp_alias):
"""
:param client_id: client identifier received after registering the tenant
:param client_secret: client password received after registering the tenant
:param redirect_uri: URI for the callback entry point of the client
:param idp_alias: name of the idp
:return: object of class IdpCredentials
"""
base_authorize_url = self.keycloak_settings.KEYCLOAK_AUTHORIZE_URL
oauth2_session = OAuth2Session(client_id, scope='openid', redirect_uri=redirect_uri)
authorization_url, state = oauth2_session.authorization_url(base_authorize_url)
authorization_url += '&kc_idp_hint=' + quote(idp_alias)
return IdpCredentials(client_id, client_secret, authorization_url, state,
redirect_uri)
def authenticate_using_refresh_token(self, client_id, client_secret, refresh_token):
"""
:param client_id: client identifier received after registering the tenant
:param client_secret: client password received after registering the tenant
:param refresh_token: openid connect refresh token
:return: openid token
"""
try:
token = self._get_token_from_refresh_token(ClientCredentials(client_id, client_secret), refresh_token)
return token
except Exception as e:
return None
def get_authorization_token(self, client_credentials, tenant_id, username=None):
"""
This method created a authorization token for the user or a service account
In case of a service account username will be null
:param client_credentials: object of class client_credentials
:param tenant_id: gateway id of the client
:param username: username of the user for which authorization token is being created
:return: AuthzToken
"""
client = BackendApplicationClient(client_id=client_credentials.client_id)
oauth = OAuth2Session(client=client)
token = oauth.fetch_token(
token_url=self.keycloak_settings.KEYCLOAK_TOKEN_URL,
client_id=client_credentials.client_id,
client_secret=client_credentials.client_secret,
verify=client_credentials.verify_ssl)
access_token = token.get('access_token')
return AuthzToken(
accessToken=access_token,
claimsMap={'gatewayID': tenant_id, 'userName': username})
def _get_token_and_user_info_password_flow(self, client_credentials):
oauth2_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_credentials.client_id))
token = oauth2_session.fetch_token(token_url=self.keycloak_settings.KEYCLOAK_TOKEN_URL,
username=client_credentials.username,
password=client_credentials.password,
client_id=client_credentials.client_id,
client_secret=client_credentials.client_secret,
verify=self.keycloak_settings.VERIFY_SSL)
user_info = oauth2_session.get(self.keycloak_settings.KEYCLOAK_USERINFO_URL).json()
return token, user_info
def _get_token_and_user_info_redirect_flow(self, client_credentials):
oauth2_session = OAuth2Session(client_credentials.client_id,
scope='openid',
redirect_uri=client_credentials.redirect_uri,
state=client_credentials.state)
token = oauth2_session.fetch_token(self.keycloak_settings.KEYCLOAK_TOKEN_URL,
client_secret=client_credentials.client_secret,
authorization_response=client_credentials.authorization_code_url,
verify=self.keycloak_settings.VERIFY_SSL)
user_info = oauth2_session.get(self.keycloak_settings.KEYCLOAK_USERINFO_URL).json()
return token, user_info
def _get_token_from_refresh_token(self, client_credentials, refresh_token):
oauth2_session = OAuth2Session(client_credentials.client_id, scope='openid')
auth = requests.auth.HTTPBasicAuth(client_credentials.client_id, client_credentials.client_secret)
token = oauth2_session.refresh_token(token_url=self.keycloak_settings.KEYCLOAK_TOKEN_URL,
refresh_token=refresh_token,
auth=auth,
verify=self.keycloak_settings.VERIFY_SSL)
return token
def _load_settings(self, configuration_file_location):
config = configparser.ConfigParser()
config.read(configuration_file_location)
settings = config['IAMServerSettings']
self.keycloak_settings.KEYCLOAK_AUTHORIZE_URL = settings['KEYCLOAK_AUTHORIZE_URL']
self.keycloak_settings.KEYCLOAK_LOGOUT_URL = settings['KEYCLOAK_LOGOUT_URL']
self.keycloak_settings.KEYCLOAK_TOKEN_URL = settings['KEYCLOAK_TOKEN_URL']
self.keycloak_settings.KEYCLOAK_USERINFO_URL = settings['KEYCLOAK_USERINFO_URL']
self.keycloak_settings.VERIFY_SSL = settings.getboolean('VERIFY_SSL')