| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # This script generates roles based on what Apache Beam uses in GCP. |
| # The roles are defined in a YAML file. |
| |
| import yaml |
| import datetime |
| import os |
| from google.cloud import iam_admin_v1 |
| from google.api_core import exceptions |
| |
| # Permissions cache to avoid repeated API calls. |
| permissions_cache = {} |
| |
| ASF_LICENSE_HEADER = """# Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the \"License\"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an \"AS IS\" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # This file is auto-generated by generate_roles.py. |
| # Do not edit manually. |
| \n""" |
| |
| def get_permission_stage(permission_name: str, project_id: str) -> str: |
| """ |
| Finds the support level of a specific IAM permission for a given project. This function caches the results to avoid repeated API calls. |
| |
| Args: |
| permission_name: The name of the permission to check, e.g., 'storage.buckets.create'. |
| project_id: The ID of the GCP project to check against. |
| Returns: |
| The support level of the permission as a string, or "" if the permission is not found. |
| """ |
| global permissions_cache |
| |
| try: |
| if f"{project_id}-stage" in permissions_cache: |
| return permissions_cache[f"{project_id}-stage"].get(permission_name, "") |
| else: |
| permissions_cache[f"{project_id}-stage"] = {} |
| |
| client = iam_admin_v1.IAMClient() |
| resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}" |
| |
| request = iam_admin_v1.QueryTestablePermissionsRequest( |
| full_resource_name=resource, |
| page_size=1000 |
| ) |
| |
| for permission in client.query_testable_permissions(request=request): |
| permissions_cache[f"{project_id}-stage"][permission.name] = permission.custom_roles_support_level |
| |
| return permissions_cache[f"{project_id}-stage"].get(permission_name, "") |
| |
| except exceptions.PermissionDenied as e: |
| print(f"Error: Permission denied. Ensure you have 'resourcemanager.projects.get' on project '{project_id}'.") |
| print(f"Details: {e}") |
| return "" |
| except exceptions.NotFound as e: |
| print(f"Error: Project '{project_id}' not found.") |
| print(f"Details: {e}") |
| return "" |
| except Exception as e: |
| print(f"An unexpected error occurred while fetching permissions: {e}") |
| return "" |
| |
| def get_role_permissions(role_name: str, project_id: str = "") -> list[str]: |
| """ |
| Gets the permissions included in a predefined or custom IAM role, filtered to only GA permissions. |
| |
| Args: |
| role_name: The full name of the role. |
| For predefined roles, e.g., 'roles/secretmanager.viewer'. |
| For custom roles, e.g., 'projects/your-project-id/roles/your-custom-role'. |
| |
| project_id: Optional, used for permission metadata lookup. |
| Returns: |
| A list of GA permissions associated with the role. |
| """ |
| |
| global permissions_cache |
| print(f"Fetching permissions for role: {role_name} in project: {project_id}") |
| |
| try: |
| if f"{project_id}-role" in permissions_cache and role_name in permissions_cache[f"{project_id}-role"]: |
| return permissions_cache[f"{project_id}-role"].get(role_name, []) |
| else: |
| if f"{project_id}-role" not in permissions_cache: |
| permissions_cache[f"{project_id}-role"] = {} |
| |
| client = iam_admin_v1.IAMClient() |
| request = iam_admin_v1.GetRoleRequest( |
| name=role_name, |
| ) |
| role = client.get_role(request=request) |
| all_perms = list(role.included_permissions) |
| ga_perms = [] |
| for perm in all_perms: |
| stage = get_permission_stage(perm, project_id) |
| if stage == iam_admin_v1.Permission.CustomRolesSupportLevel.SUPPORTED: |
| ga_perms.append(perm) |
| |
| permissions_cache[f"{project_id}-role"][role_name] = ga_perms |
| return ga_perms |
| except exceptions.NotFound: |
| print(f"Error: The role '{role_name}' was not found.") |
| return [] |
| except Exception as e: |
| print(f"An unexpected error occurred: {e}") |
| return [] |
| |
| def filter_permissions(permissions: list[str], allowed_prefixes: list[str] = [], denied_suffixes: list[str] = []) -> set[str]: |
| """ |
| Filters permissions based on the provided services. |
| |
| Args: |
| permissions: A list of permissions to filter. |
| allowed_prefixes: A list of strings that permissions must contain to be included. |
| denied_suffixes: A list of strings that permissions must not contain to be included. |
| Returns: |
| A list of permissions that match the specified services. |
| """ |
| |
| filtered_permissions = set() |
| |
| for perm in permissions: |
| if any(perm.startswith(prefix) for prefix in allowed_prefixes): |
| if not any(perm.endswith(suffix) for suffix in denied_suffixes): |
| filtered_permissions.add(perm) |
| |
| return filtered_permissions |
| |
| def generate_role(role_name: str , perms: set[str]) -> dict: |
| return { |
| "role_id": f"{role_name}", |
| "title": f"{role_name}", |
| "stage": "GA", |
| "description": f"This is the {role_name} role", |
| "permissions": sorted(list(perms)), |
| } |
| |
| def write_role_yaml(filename, role_data): |
| if not role_data.get("permissions"): |
| print(f"No permissions to write for {filename}. Skipping.") |
| return |
| with open(filename, "w") as f: |
| f.write(ASF_LICENSE_HEADER) |
| f.write(f"# This file was generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n") |
| yaml.dump(role_data, f, default_flow_style=False) |
| |
| def get_config(): |
| """ |
| Reads the roles configuration from the YAML file and returns it as a dictionary. |
| The configuration includes services, roles, and suffixes for filtering permissions. |
| """ |
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
| config_path = os.path.join(script_dir, "roles_config.yaml") |
| with open(config_path, "r") as f: |
| config = yaml.safe_load(f) |
| |
| # Each role inherits permissions from the previous role. |
| # This means that the viewer role has all the permissions of the committer role, and so on. |
| # The roles are defined in the order of viewer, committer, infra_manager, and admin. |
| # The viewer role is the base role, so its file contains all its |
| |
| response = { |
| "project_id": config.get("project_id", "apache-beam-testing"), |
| "roles_prefix": config.get("roles_prefix", "beam"), |
| "role": {} |
| } |
| |
| # Add suffixes to the response |
| suffixes = {} |
| for suffix in config.get("suffixes", []): |
| suffixes[suffix["name"]] = suffix["values"] |
| |
| services = set() |
| roles = set() |
| |
| # Sort roles by hierarchy to ensure they are processed in the correct order. |
| config["roles"].sort(key=lambda x: int(x.get("hierarchy", 0))) |
| |
| for role in config["roles"]: |
| services.update(role.get("services", [])) |
| roles.update(role.get("roles", [])) |
| |
| response["role"][role["name"]] = { |
| "name": role["name"], |
| "description": role.get("description", f"This is the {role['name']} role"), |
| "services": services.copy(), |
| "roles": roles.copy(), |
| "except_suffixes": [], |
| } |
| |
| # If the role has except_suffixes, add them to the response |
| suffix_set = set() |
| for except_suffix in role.get("except_suffixes", []): |
| if except_suffix in suffixes: |
| suffix_set.update(suffixes[except_suffix]) |
| else: |
| raise ValueError(f"Unknown suffix '{except_suffix}' in role '{role['name']}'") |
| if suffix_set: |
| response["role"][role["name"]]["except_suffixes"] = list(suffix_set) |
| |
| return response |
| |
| def get_roles(): |
| """ |
| Generates the roles based on the predefined services and permissions. |
| This function creates roles for Beam Viewer, Committer, Infra Manager, and Admin. |
| It filters permissions based on the allowed and denied strings defined in the configuration. |
| """ |
| |
| config = get_config() |
| response = {} |
| |
| project_id = config["project_id"] |
| |
| permissions_added = set() |
| |
| for role in config["role"].values(): |
| print(f"Generating role: {config['roles_prefix']}_{role['name']} with services: {role['services']} and roles: {role['roles']}") |
| # Get the permissions for each base role. |
| role_permissions = set() |
| for role_name in role["roles"]: |
| role_permissions.update(get_role_permissions(role_name, project_id)) |
| role["permissions"] = filter_permissions( |
| permissions=list(role_permissions), |
| allowed_prefixes=list(role["services"]), |
| denied_suffixes=role.get("except_suffixes", []) |
| ) |
| # Remove already added permissions to avoid duplicates. |
| role["permissions"] = role["permissions"].difference(permissions_added) |
| permissions_added.update(role["permissions"]) |
| response[f"{config['roles_prefix']}_{role['name']}"] = generate_role(f"{config['roles_prefix']}_{role['name']}", role["permissions"]) |
| |
| return response |
| |
| def main(): |
| """ |
| Main function to generate the roles and write them to YAML files. |
| It creates a directory for the roles if it doesn't exist and writes each role to its respective file. |
| """ |
| |
| roles = get_roles() |
| |
| for role_name, role_data in roles.items(): |
| filename = f"{role_name}.role.yaml" |
| write_role_yaml(filename, role_data) |
| print(f"Generated {filename} with {len(role_data['permissions'])} permissions.") |
| |
| if __name__ == "__main__": |
| main() |