blob: f72df764203ddd17af340ec550ae35e43d2207f8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import configparser
from airavata_custos import utils
from airavata_custos.settings import ProfileSettings
logger = logging.getLogger(__name__)
class IAMAdminClient(object):
def __init__(self, configuration_file_location):
"""
constructor for IAMAdminClient class
:param configuration_file_location: takes the location of the ini file containing server configuration
"""
self.profile_settings = ProfileSettings()
self._load_settings(configuration_file_location)
self.iamadmin_client_pool = utils.initialize_iamadmin_client_pool(self.profile_settings.PROFILE_SERVICE_HOST,
self.profile_settings.PROFILE_SERVICE_PORT)
def is_username_available(self, authz_token, username):
"""
This method validates if the username is available or not
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username whose availability needs to be verified
:return: boolean
"""
return self.iamadmin_client_pool.isUsernameAvailable(authz_token, username)
def register_user(self, authz_token, username, email_address, first_name, last_name, password):
"""
This method registers the user with the keycloak instance returns true if successful, false if the registration fails
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username of the user that needs to be registered
:param email_address: The email address of the user that needs to be registered
:param first_name: The first name of the user that needs to be registered
:param last_name: The last name of the user that needs to be registered
:param password: The password of the user that needs to be registered
:return: boolean
"""
return self.iamadmin_client_pool.registerUser(
authz_token,
username,
email_address,
first_name,
last_name,
password)
def is_user_enabled(self, authz_token, username):
"""
Checks the user is enabled/disabled in keycloak. Only the enabled user can login
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username of the user
:return: boolean
"""
return self.iamadmin_client_pool.isUserEnabled(authz_token, username)
def enable_user(self, authz_token, username):
"""
The method to enable a disabled user
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username of the user
:return: Object of UserProfile class, containing user details
"""
return self.iamadmin_client_pool.enableUser(authz_token, username)
def delete_user(self, authz_token, username):
"""
This method deleted the user from keycloak. Returns true if delete is successful
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username of the user
:return: boolean
"""
return self.iamadmin_client_pool.deleteUser(authz_token, username)
def is_user_exist(self, authz_token, username):
"""
This method checks if the user exists in keycloak. Returns true if the user exists otherwise returns false
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: The username of the user
:return: boolean
"""
try:
return self.iamadmin_client_pool.isUserExist(authz_token, username)
except Exception:
return None
def get_user(self, authz_token, username):
"""
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: username of the user
:return: object of class UserProfile
"""
try:
return self.iamadmin_client_pool.getUser(authz_token, username)
except Exception:
return None
def get_users(self, authz_token, offset=0, limit=-1, search=None):
"""
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param offset: start index
:param limit: end index
:param search: search criteria for filtering users
:return: list of UserProfile class objects
"""
try:
return self.iamadmin_client_pool.getUsers(authz_token, offset, limit, search)
except Exception:
return None
def reset_user_password(self, authz_token, username, new_password):
"""
:param authz_token: Object of AuthzToken class containing access token, username, gatewayId of the active user
:param username: username of the user
:param new_password: new password for the user
:return:
"""
try:
return self.iamadmin_client_pool.resetUserPassword(
authz_token, username, new_password)
except Exception:
return None
def _load_settings(self, configuration_file_location):
config = configparser.ConfigParser()
config.read(configuration_file_location)
settings = config['ProfileServerSettings']
self.profile_settings.PROFILE_SERVICE_HOST = settings['PROFILE_SERVICE_HOST']
self.profile_settings.PROFILE_SERVICE_PORT = settings['PROFILE_SERVICE_PORT']