blob: 6106626ae12d86e51adc270f80c899e83ce3e563 [file] [log] [blame]
"""
Wrapper around the IAM Admin Services client.
"""
import logging
from urllib.parse import urlparse
import requests
from django.conf import settings
from django_airavata.utils import iamadmin_client_pool
from . import utils
logger = logging.getLogger(__name__)
def is_username_available(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.isUsernameAvailable(authz_token, username)
def register_user(username, email_address, first_name, last_name, password):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.registerUser(
authz_token,
username,
email_address,
first_name,
last_name,
password)
def is_user_enabled(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.isUserEnabled(authz_token, username)
def enable_user(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.enableUser(authz_token, username)
def delete_user(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.deleteUser(authz_token, username)
def is_user_exist(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.isUserExist(authz_token, username)
def get_user(username):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.getUser(authz_token, username)
def get_users(offset, limit, search=None):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.getUsers(authz_token, offset, limit, search)
def reset_user_password(username, new_password):
authz_token = utils.get_service_account_authz_token()
return iamadmin_client_pool.resetUserPassword(
authz_token, username, new_password)
def update_username(username, new_username):
# make sure that new_username is available
if not is_username_available(new_username):
raise Exception(f"Can't change username of {username} to {new_username} because it is not available")
# fetch user representation
authz_token = utils.get_service_account_authz_token()
headers = {'Authorization': f'Bearer {authz_token.accessToken}'}
parsed = urlparse(settings.KEYCLOAK_AUTHORIZE_URL)
r = requests.get(f"{parsed.scheme}://{parsed.netloc}/auth/admin/realms/{settings.GATEWAY_ID}/users",
params={'username': username},
headers=headers)
r.raise_for_status()
user_list = r.json()
user = None
# The users search finds partial matches. Loop to find the exact match.
for u in user_list:
if u['username'] == username:
user = u
break
if user is None:
raise Exception(f"Could not find user {username}")
# update username
user['username'] = new_username
r = requests.put(f"{parsed.scheme}://{parsed.netloc}/auth/admin/realms/{settings.GATEWAY_ID}/users/{user['id']}",
json=user,
headers=headers)
r.raise_for_status()
def update_user(username, first_name=None, last_name=None, email=None):
# fetch user representation
authz_token = utils.get_service_account_authz_token()
headers = {'Authorization': f'Bearer {authz_token.accessToken}'}
parsed = urlparse(settings.KEYCLOAK_AUTHORIZE_URL)
r = requests.get(f"{parsed.scheme}://{parsed.netloc}/auth/admin/realms/{settings.GATEWAY_ID}/users",
params={'username': username},
headers=headers)
r.raise_for_status()
user_list = r.json()
user = None
# The users search finds partial matches. Loop to find the exact match.
for u in user_list:
if u['username'] == username:
user = u
break
if user is None:
raise Exception(f"Could not find user {username}")
# update user
if first_name is not None:
user['firstName'] = first_name
if last_name is not None:
user['lastName'] = last_name
if email is not None:
user['email'] = email
r = requests.put(f"{parsed.scheme}://{parsed.netloc}/auth/admin/realms/{settings.GATEWAY_ID}/users/{user['id']}",
json=user,
headers=headers)
r.raise_for_status()