blob: 3abb9b7bcb0b1d8150eac08b8eadf571079909dc [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script is used to export the IAM policy of a Google Cloud project to a YAML format.
# It retrieves the IAM policy bindings, parses the members, and formats the output in a structured
# YAML format, excluding service accounts and groups. The output includes usernames, emails, and
# their associated permissions, with optional conditions for roles that have conditions attached.
# You need to have the Google Cloud SDK installed and authenticated to run this script.
import argparse
import os
import sys
import yaml
import roles.generate_roles as generate_roles
from generate import export_project_iam, to_yaml_file
from google.cloud.iam_admin_v1 import GetRoleRequest, IAMClient
def migrate_permissions(data: list) -> list:
"""
Migrates permissions from the permissions to the new roles defined on beam_roles/ directory.
The rules are:
- If the user has owner role, leave it as is, remove any other role as it is redundant.
- If the user has any admin or secret related role, it will be migrated to the beam_admin role.
- If the user has an editor role or any user role but not an admin or secret related role, it will be migrated to the beam_infra_manager role.
- If the user has a role that is not only viewer, it will be migrated to the beam_committer role.
- The users with just viewer roles will be migrated to the beam_viewer role.
The rules are in a hierarchical order, meaning that if a user has a high role, it will also have the lower roles.
Args:
data: A list of dictionaries containing user permissions and details.
Returns:
A list of dictionaries with migrated permissions.
"""
migrated_data = []
for item in data:
username = item["username"]
email = item["email"]
permissions = item["permissions"]
# Initialize the new roles
new_roles = {
"beam_owner": False,
"beam_admin": False,
"beam_infra_manager": False,
"beam_committer": False,
"beam_viewer": False
}
for permission in permissions:
role = permission["role"]
# If the role is 'roles/owner', it is considered an owner role.
if role == "roles/owner":
new_roles["beam_owner"] = True
# If it ends with 'admin' or containes 'secretmanager' in the role, it is considered an admin role. Case insensitive.
elif 'admin' in role.lower() or 'secretmanager' in role.lower():
new_roles["beam_admin"] = True
new_roles["beam_infra_manager"] = True
new_roles["beam_committer"] = True
new_roles["beam_viewer"] = True
# If it is an editor role, it will be migrated to the beam_infra_manager.
elif role == "roles/editor":
new_roles["beam_infra_manager"] = True
new_roles["beam_committer"] = True
new_roles["beam_viewer"] = True
elif role != "roles/viewer":
# If it is a role that is not only viewer, it will be migrated to the beam_committer role.
new_roles["beam_committer"] = True
new_roles["beam_viewer"] = True
# If it is a viewer role, it will be migrated to the beam_viewer role.
else:
new_roles["beam_viewer"] = True
# Create the migrated entry
migrated_entry = {
"username": username,
"email": email,
"permissions": []
}
if new_roles["beam_owner"]:
migrated_entry["permissions"].append({"role": "roles/owner"})
else:
if new_roles["beam_admin"]:
migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_admin"})
if new_roles["beam_infra_manager"]:
migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_infra_manager"})
if new_roles["beam_committer"]:
migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_committer"})
if new_roles["beam_viewer"]:
migrated_entry["permissions"].append({"role": "projects/PROJECT-ID/roles/beam_viewer"})
migrated_data.append(migrated_entry)
return migrated_data
def get_gcp_role_permissions(role_id: str) -> list:
"""
Retrieves the permissions associated to a google cloud role.
Args:
project_id: The ID of the Google Cloud project.
role_id: The name of the role to retrieve permissions for.
Returns:
A list of permissions associated with the specified role.
"""
client = IAMClient()
request = GetRoleRequest(name=role_id)
role = client.get_role(request=request)
return list(role.included_permissions)
def get_roles_from_file(file_path: str) -> list:
"""
Reads a YAML file containing roles and returns a list of dictionaries with user data.
Args:
file_path: The path to the YAML file containing roles.
Returns:
A list of dictionaries with user data.
"""
with open(file_path, 'r') as file:
data = yaml.safe_load(file)
roles = []
for role in data:
email = role.get("email")
username = role.get("username")
permissions = role.get("permissions", [])
roles.append({
"email": email,
"username": username,
"permissions": permissions
})
return roles
def permission_differences(project_id: str, user_email: str) -> list:
"""
Generates a list of differences between the original and migrated permissions for a user.
It gets the permission from the generated files, so it is expected that the files are already generated and up to date.
Args:
project_id: The ID of the Google Cloud project.
user_email: The email of the user to compare permissions for.
Returns:
A list of dictionaries containing the differences in permissions for the specified user.
"""
cache = {}
user_differences = {}
original = get_roles_from_file(f"{project_id}.original-roles.yaml")
migrated = get_roles_from_file(f"{project_id}.migrated-roles.yaml")
# Get the permissions on the beam_roles
beam_roles = generate_roles.get_roles()
for role_name, role_data in beam_roles.items():
permissions = role_data["permissions"]
cache[role_name] = permissions
# Get the permissions for the original roles
for user in original:
username = user["username"]
email = user["email"]
# Skip if the user email does not match the specified user_email
if user_email and email != user_email:
continue
original_roles = user["permissions"]
original_permissions = []
for role in original_roles:
if '_withcond_' in role['role']:
# Skip roles with conditions, as they are not supported in the new roles
continue
if 'organizations/' in role['role']:
# Skip organization roles, as they are not supported in the new roles
continue
if role['role'] not in cache:
permissions = get_gcp_role_permissions(role["role"])
cache[role['role']] = sorted(permissions)
original_permissions.extend(cache[role['role']])
# Initialize the user differences entry
user_differences[username] = {
"email": email,
"original_roles": original_roles,
"original_permissions": sorted(original_permissions),
"migrated_roles": [],
"migrated_permissions": [],
"differences": []
}
# Get the permissions for the migrated roles
for user in migrated:
username = user["username"]
email = user["email"]
# Skip if the user email does not match the specified user_email
if user_email and email != user_email:
continue
migrated_roles = user["permissions"]
migrated_permissions = []
for role in migrated_roles:
full_role_name = role["role"]
# Owner is a special case, it should not be migrated to any other role.
if "roles/owner" in full_role_name:
migrated_permissions.extend(get_gcp_role_permissions(full_role_name))
else:
role_name = full_role_name.split('roles/')[1]
migrated_permissions.extend(cache[role_name])
user_differences[username]["migrated_roles"] = migrated_roles
user_differences[username]["migrated_permissions"] = sorted(migrated_permissions)
# Compare original and migrated permissions
differences_list = []
for username, user_data in user_differences.items():
original_permissions = user_data["original_permissions"]
migrated_permissions = user_data["migrated_permissions"]
# Find differences in permissions
original_set = set(original_permissions)
migrated_set = set(migrated_permissions)
added_permissions = migrated_set.difference(original_set)
removed_permissions = original_set.difference(migrated_set)
if added_permissions or removed_permissions:
differences = {
"username": username,
"email": user_data["email"],
"added_permissions": sorted(list(added_permissions)),
"removed_permissions": sorted(list(removed_permissions))
}
differences_list.append(differences)
return differences_list
def main():
"""
Main function to run the script.
This function parses command-line arguments to either export IAM policies
or generate permission differences for a specified GCP project.
"""
parser = argparse.ArgumentParser(
description="Export IAM policies or generate permission differences for a GCP project."
)
parser.add_argument(
"project_id",
help="The Google Cloud project ID."
)
parser.add_argument(
"--difference",
dest="user_email",
metavar="USER_EMAIL",
help="Generate permission differences for the specified user email."
)
args = parser.parse_args()
project_id = args.project_id
user_email = args.user_email
if user_email:
# If the iam policy has not been generated yet, it will generate the original IAM policy first.
if not os.path.exists(f"{project_id}.original-roles.yaml") or not os.path.exists(f"{project_id}.migrated-roles.yaml"):
print(f"Original IAM policy for project {project_id} not found. Generating original and migrated roles first.")
print(f"Exporting IAM policy for project {project_id}...")
iam_data = export_project_iam(project_id)
original_filename = f"{project_id}.original-roles.yaml"
original_header = f"Exported original IAM policy for project {project_id}"
to_yaml_file(iam_data, original_filename, header_info=original_header)
print("Migrating permissions to new roles...")
migrated_data = migrate_permissions(iam_data)
migrated_filename = f"{project_id}.migrated-roles.yaml"
migrated_header = f"Migrated IAM policy for project {project_id} to new beam_roles"
to_yaml_file(migrated_data, migrated_filename, header_info=migrated_header)
print(f"Generated {original_filename} and {migrated_filename}")
print(f"Generating permission differences for {user_email} in project {project_id}...")
differences = permission_differences(project_id, user_email)
if differences:
output_filename = f"{project_id}.permission-differences.yaml"
header = f"Permission differences for user {user_email} in project {project_id}"
to_yaml_file(differences, output_filename, header_info=header)
print(f"Generated {output_filename}")
else:
print(f"No permission differences found for user {user_email} in project {project_id}.")
else:
print(f"Exporting IAM policy for project {project_id}...")
iam_data = export_project_iam(project_id)
original_filename = f"{project_id}.original-roles.yaml"
original_header = f"Exported original IAM policy for project {project_id}"
to_yaml_file(iam_data, original_filename, header_info=original_header)
print("Migrating permissions to new roles...")
migrated_data = migrate_permissions(iam_data)
migrated_filename = f"{project_id}.migrated-roles.yaml"
migrated_header = f"Migrated IAM policy for project {project_id} to new beam_roles"
to_yaml_file(migrated_data, migrated_filename, header_info=migrated_header)
print(f"Generated {original_filename} and {migrated_filename}")
print(f"To generate permission differences, run: python {sys.argv[0]} {project_id} --difference <user_email>")
if __name__ == "__main__":
main()