| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import argparse |
| import datetime |
| import logging |
| import os |
| import sys |
| import yaml |
| from google.api_core import exceptions |
| from google.cloud import resourcemanager_v3 |
| from typing import Optional, List, Dict, Tuple |
| from sending import SendingClient |
| |
| CONFIG_FILE = "config.yml" |
| |
| class IAMPolicyComplianceChecker: |
| |
| def is_project_service_account_email(self, email: Optional[str]) -> bool: |
| """ |
| Returns True if the email is not a service account, or if it is a service account and the email contains the project_id. |
| """ |
| if email and email.endswith('.gserviceaccount.com'): |
| return self.project_id in email |
| return True |
| |
| def __init__(self, project_id: str, users_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None): |
| self.project_id = project_id |
| self.users_file = users_file |
| self.client = resourcemanager_v3.ProjectsClient() |
| self.logger = logger |
| self.sending_client = sending_client |
| |
| def _parse_member(self, member: str) -> tuple[str, Optional[str], str]: |
| """Parses an IAM member string to extract type, email, and a derived username. |
| |
| Args: |
| member: The IAM member string |
| Returns: |
| A tuple containing: |
| - username: The derived username from the member string. |
| - email: The email address if available, otherwise None. |
| - member_type: The type of the member (e.g., user, serviceAccount, group). |
| """ |
| email = None |
| username = member |
| |
| # Split the member string to determine type and identifier |
| parts = member.split(':', 1) |
| member_type = parts[0] if len(parts) > 1 else "unknown" |
| identifier = parts[1] if len(parts) > 1 else member |
| |
| if member_type in ["user", "serviceAccount", "group"]: |
| email = identifier |
| if '@' in identifier: |
| username = identifier.split('@')[0] |
| else: |
| username = identifier |
| else: |
| username = identifier |
| member_type = "unknown" |
| email = None |
| |
| return username, email, member_type |
| |
| def _export_project_iam(self) -> List[Dict]: |
| """Exports the IAM policy for a given project to YAML format. |
| |
| Returns: |
| A list of dictionaries containing the IAM policy details. |
| """ |
| |
| try: |
| policy = self.client.get_iam_policy(resource=f"projects/{self.project_id}") |
| self.logger.debug(f"Retrieved IAM policy for project {self.project_id}") |
| except exceptions.NotFound as e: |
| self.logger.error(f"Project {self.project_id} not found: {e}") |
| raise |
| except exceptions.PermissionDenied as e: |
| self.logger.error(f"Permission denied for project {self.project_id}: {e}") |
| raise |
| except Exception as e: |
| self.logger.error(f"An error occurred while retrieving IAM policy for project {self.project_id}: {e}") |
| raise |
| |
| members_data = {} |
| |
| for binding in policy.bindings: |
| role = binding.role |
| |
| for member_str in binding.members: |
| if member_str not in members_data: |
| username, email_address, member_type = self._parse_member(member_str) |
| # Skip service accounts not matching the project_id |
| if member_type == "serviceAccount" and not self.is_project_service_account_email(email_address): |
| self.logger.debug(f"Skipping service account not matching project_id ({self.project_id}): {email_address}") |
| continue |
| if member_type == "unknown": |
| self.logger.warning(f"Skipping member {member_str} with no email address") |
| continue # Skip if no email address is found, probably a malformed member |
| members_data[member_str] = { |
| "username": username, |
| "email": email_address, |
| "member_type": member_type, |
| "permissions": [] |
| } |
| |
| # Skip permissions that have a condition |
| if "withcond" in role: |
| continue |
| |
| permission_entry = {} |
| permission_entry["role"] = role |
| |
| members_data[member_str]["permissions"].append(permission_entry) |
| |
| output_list = [] |
| for data in members_data.values(): |
| data["permissions"] = sorted(data["permissions"], key=lambda p: p["role"]) |
| output_list.append({ |
| "username": data["username"], |
| "email": data["email"], |
| "member_type": data["member_type"], |
| "permissions": data["permissions"] |
| }) |
| |
| output_list.sort(key=lambda x: x["username"]) |
| return output_list |
| |
| def _read_project_iam_file(self) -> List[Dict]: |
| """Reads the IAM policy from a YAML file. |
| |
| Returns: |
| A list of dictionaries containing the IAM policy details. |
| """ |
| try: |
| with open(self.users_file, "r") as file: |
| iam_policy = yaml.safe_load(file) |
| |
| |
| self.logger.debug(f"Retrieved IAM policy from file for project {self.project_id}") |
| return iam_policy |
| except FileNotFoundError: |
| self.logger.error(f"IAM policy file not found for project {self.project_id}") |
| return [] |
| except Exception as e: |
| self.logger.error(f"An error occurred while reading IAM policy file for project {self.project_id}: {e}") |
| return [] |
| |
| def _to_yaml_file(self, data: List[Dict], output_file: str, header_info: str = "") -> None: |
| """ |
| Writes a list of dictionaries to a YAML file. |
| Include the apache license header on the files |
| |
| Args: |
| data: A list of dictionaries containing user permissions and details. |
| output_file: The file path where the YAML output will be written. |
| header_info: A string containing the header information to be included in the YAML file. |
| """ |
| |
| apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| |
| # Prepare the header with the Apache license |
| header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" |
| |
| try: |
| with open(output_file, "w") as file: |
| file.write(header) |
| yaml.dump(data, file, sort_keys=False, default_flow_style=False, indent=2) |
| self.logger.info(f"Successfully wrote IAM policy data to {output_file}") |
| except IOError as e: |
| self.logger.error(f"Failed to write to {output_file}: {e}") |
| raise |
| |
| def check_compliance(self) -> List[str]: |
| """ |
| Checks the compliance of the IAM policy against the defined policies. |
| |
| Returns: |
| A list of strings describing any compliance issues found. |
| """ |
| |
| current_users = {user['email']: user for user in self._export_project_iam() if self.is_project_service_account_email(user.get('email'))} |
| existing_users = {user['email']: user for user in self._read_project_iam_file() if self.is_project_service_account_email(user.get('email'))} |
| |
| if not existing_users: |
| error_msg = f"No IAM policy found in the {self.users_file}." |
| self.logger.info(error_msg) |
| raise RuntimeError(error_msg) |
| |
| differences = [] |
| |
| all_emails = set(current_users.keys()) | set(existing_users.keys()) |
| |
| for email in sorted(list(all_emails)): |
| current_user = current_users.get(email) |
| existing_user = existing_users.get(email) |
| |
| if current_user and not existing_user: |
| differences.append(f"User {email} not found in existing policy.") |
| elif not current_user and existing_user: |
| differences.append(f"User {email} found in policy file but not in GCP.") |
| elif current_user and existing_user: |
| if current_user.get("member_type") != existing_user.get("member_type"): |
| differences.append(f"User {email} has different member type. In GCP: {current_user.get('member_type')}, in file: {existing_user.get('member_type')}") |
| if current_user["permissions"] != existing_user["permissions"]: |
| msg = f"\nPermissions for user {email} differ." |
| msg += f"\nIn GCP: {current_user['permissions']}" |
| msg += f"\nIn {self.users_file}: {existing_user['permissions']}" |
| self.logger.info(msg) |
| differences.append(msg) |
| |
| return differences |
| |
| def create_announcement(self, recipient: str) -> None: |
| """ |
| Creates an announcement about compliance issues using the SendingClient. |
| |
| Args: |
| recipient (str): The email address of the announcement recipient. |
| """ |
| if not self.sending_client: |
| raise ValueError("SendingClient is required for creating announcements") |
| |
| diff = self.check_compliance() |
| |
| if not diff: |
| self.logger.info("No compliance issues found, no announcement will be created.") |
| return |
| |
| title = f"IAM Policy Non-Compliance Detected" |
| body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n" |
| for issue in diff: |
| body += f"- {issue}\n" |
| |
| announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n" |
| announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n" |
| announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations." |
| |
| self.sending_client.create_announcement(title, body, recipient, announcement) |
| |
| def print_announcement(self, recipient: str) -> None: |
| """ |
| Prints announcement details instead of sending them (for testing purposes). |
| |
| Args: |
| recipient (str): The email address of the announcement recipient. |
| """ |
| if not self.sending_client: |
| raise ValueError("SendingClient is required for printing announcements") |
| |
| diff = self.check_compliance() |
| |
| if not diff: |
| self.logger.info("No compliance issues found, no announcement will be printed.") |
| return |
| |
| title = f"IAM Policy Non-Compliance Detected" |
| body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n" |
| for issue in diff: |
| body += f"- {issue}\n" |
| |
| announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n" |
| announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n" |
| announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations." |
| |
| self.sending_client.print_announcement(title, body, recipient, announcement) |
| |
| def generate_compliance(self) -> None: |
| """ |
| Modifies the users file to match the current IAM policy. |
| If no changes are needed, no file will be written. |
| """ |
| |
| try: |
| diff = self.check_compliance() |
| except RuntimeError: |
| self.logger.info("No existing IAM policy found.") |
| diff = ["No existing policy found"] |
| |
| if not diff or (len(diff) == 1 and "No existing policy found" not in diff[0]): |
| self.logger.info("No compliance issues found, no changes will be made.") |
| return |
| |
| current_policy = self._export_project_iam() |
| header_info = f"IAM policy for project {self.project_id}" |
| |
| self._to_yaml_file(current_policy, self.users_file, header_info) |
| self.logger.info(f"Generated new compliance file: {self.users_file}") |
| |
| def config_process() -> Dict[str, str]: |
| with open(CONFIG_FILE, "r") as file: |
| config = yaml.safe_load(file) |
| |
| if not config: |
| raise ValueError("Configuration file is empty or invalid.") |
| |
| config_res = dict() |
| |
| config_res["project_id"] = config.get("project_id", "apache-beam-testing") |
| config_res["logging_level"] = config.get("logging", {}).get("level", "INFO") |
| config_res["logging_format"] = config.get("logging", {}).get("format", "[%(asctime)s] %(levelname)s: %(message)s") |
| config_res["users_file"] = config.get("users_file", "../iam/users.yml") |
| config_res["action"] = config.get("action", "check") |
| |
| # SendingClient configuration |
| config_res["github_token"] = os.getenv("GITHUB_TOKEN", "") |
| config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam") |
| config_res["smtp_server"] = os.getenv("SMTP_SERVER", "") |
| config_res["smtp_port"] = os.getenv("SMTP_PORT", 587) |
| config_res["email"] = os.getenv("EMAIL_ADDRESS", "") |
| config_res["password"] = os.getenv("EMAIL_PASSWORD", "") |
| config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "") |
| |
| return config_res |
| |
| def main(): |
| # Parse command line arguments |
| parser = argparse.ArgumentParser(description="IAM Policy Compliance Checker") |
| parser.add_argument("--action", choices=["check", "announce", "print", "generate"], |
| help="Action to perform: check compliance, create announcement, print announcement, or generate new compliance") |
| args = parser.parse_args() |
| |
| config = config_process() |
| |
| # Command line argument takes precedence over config file |
| action = args.action if args.action else config.get("action", "check") |
| |
| logging.basicConfig(level=getattr(logging, config["logging_level"].upper(), logging.INFO), |
| format=config["logging_format"]) |
| logger = logging.getLogger("IAMPolicyComplianceChecker") |
| |
| # Create SendingClient if needed for announcement actions |
| sending_client = None |
| if action in ["announce", "print"]: |
| try: |
| # Provide default values for testing, especially for print action |
| github_token = config["github_token"] or "dummy-token" |
| github_repo = config["github_repo"] or "dummy/repo" |
| smtp_server = config["smtp_server"] or "dummy-server" |
| smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 587 |
| email = config["email"] or "dummy@example.com" |
| password = config["password"] or "dummy-password" |
| |
| sending_client = SendingClient( |
| logger=logger, |
| github_token=github_token, |
| github_repo=github_repo, |
| smtp_server=smtp_server, |
| smtp_port=smtp_port, |
| email=email, |
| password=password |
| ) |
| except Exception as e: |
| logger.error(f"Failed to initialize SendingClient: {e}") |
| return 1 |
| |
| logger.info(f"Starting IAM policy compliance check with action: {action}") |
| iam_checker = IAMPolicyComplianceChecker(config["project_id"], config["users_file"], logger, sending_client) |
| |
| try: |
| if action == "check": |
| compliance_issues = iam_checker.check_compliance() |
| if compliance_issues: |
| logger.warning("IAM policy compliance issues found:") |
| for issue in compliance_issues: |
| logger.warning(issue) |
| else: |
| logger.info("IAM policy is compliant.") |
| elif action == "announce": |
| logger.info("Creating announcement for compliance violations...") |
| recipient = config["recipient"] or "admin@example.com" |
| iam_checker.create_announcement(recipient) |
| elif action == "print": |
| logger.info("Printing announcement for compliance violations...") |
| recipient = config["recipient"] or "admin@example.com" |
| iam_checker.print_announcement(recipient) |
| elif action == "generate": |
| logger.info("Generating new compliance based on current IAM policy...") |
| iam_checker.generate_compliance() |
| else: |
| logger.error(f"Unknown action: {action}") |
| return 1 |
| except Exception as e: |
| logger.error(f"Error executing action '{action}': {e}") |
| return 1 |
| |
| return 0 |
| |
| if __name__ == "__main__": |
| |
| sys.exit(main()) |