blob: 04d208dfe5df8fabfd72a8f779d3350a2cc00835 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import configparser
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import LegacyApplicationClient
from airavata.model.security.ttypes import AuthzToken
from airavata_sdk.transport.settings import KeycloakConfiguration
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
class Authenticator(object):
def __init__(self, configuration_file_location=None):
self.keycloak_settings = KeycloakConfiguration(configuration_file_location)
self._load_settings(configuration_file_location)
def get_token_and_user_info_password_flow(self, username, password, gateway_id):
client_id = self.keycloak_settings.CLIENT_ID
client_secret = self.keycloak_settings.CLIENT_SECRET
token_url = self.keycloak_settings.TOKEN_URL
userinfo_url = self.keycloak_settings.USER_INFO_URL
verify_ssl = self.keycloak_settings.VERIFY_SSL
oauth2_session = OAuth2Session(client=LegacyApplicationClient(
client_id=client_id))
token = oauth2_session.fetch_token(token_url=token_url,
username=username,
password=password,
client_id=client_id,
client_secret=client_secret,
verify=self.keycloak_settings.KEYCLOAK_CA_CERTIFICATE if verify_ssl else False)
claimsMap = {
"userName": username,
"gatewayID": gateway_id
}
return AuthzToken(accessToken=token['access_token'], claimsMap=claimsMap)
def _load_settings(self, configuration_file_location):
if configuration_file_location is not None:
config = configparser.ConfigParser()
config.read(configuration_file_location)
self.keycloak_settings.KEYCLOAK_CA_CERTIFICATE = config.get("KeycloakServer", 'CERTIFICATE_FILE_PATH')
self.keycloak_settings.CLIENT_ID = config.get('KeycloakServer', 'CLIENT_ID')
self.keycloak_settings.CLIENT_SECRET = config.get('KeycloakServer', 'CLIENT_SECRET')
self.keycloak_settings.TOKEN_URL = config.get('KeycloakServer', 'TOKEN_URL')
self.keycloak_settings.USER_INFO_URL = config.get('KeycloakServer', 'USER_INFO_URL')
self.keycloak_settings.VERIFY_SSL = config.getboolean('KeycloakServer', 'VERIFY_SSL')