blob: c4c65c7c679c881146ffd2a44ddf00e79895cfa6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import datetime
import logging
import os
import sys
import yaml
from google.api_core import exceptions
from google.cloud import resourcemanager_v3
from typing import Optional, List, Dict, Tuple
from sending import SendingClient
CONFIG_FILE = "config.yml"
class IAMPolicyComplianceChecker:
def is_project_service_account_email(self, email: Optional[str]) -> bool:
"""
Returns True if the email is not a service account, or if it is a service account and the email contains the project_id.
"""
if email and email.endswith('.gserviceaccount.com'):
return self.project_id in email
return True
def __init__(self, project_id: str, users_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None):
self.project_id = project_id
self.users_file = users_file
self.client = resourcemanager_v3.ProjectsClient()
self.logger = logger
self.sending_client = sending_client
def _parse_member(self, member: str) -> tuple[str, Optional[str], str]:
"""Parses an IAM member string to extract type, email, and a derived username.
Args:
member: The IAM member string
Returns:
A tuple containing:
- username: The derived username from the member string.
- email: The email address if available, otherwise None.
- member_type: The type of the member (e.g., user, serviceAccount, group).
"""
email = None
username = member
# Split the member string to determine type and identifier
parts = member.split(':', 1)
member_type = parts[0] if len(parts) > 1 else "unknown"
identifier = parts[1] if len(parts) > 1 else member
if member_type in ["user", "serviceAccount", "group"]:
email = identifier
if '@' in identifier:
username = identifier.split('@')[0]
else:
username = identifier
else:
username = identifier
member_type = "unknown"
email = None
return username, email, member_type
def _export_project_iam(self) -> List[Dict]:
"""Exports the IAM policy for a given project to YAML format.
Returns:
A list of dictionaries containing the IAM policy details.
"""
try:
policy = self.client.get_iam_policy(resource=f"projects/{self.project_id}")
self.logger.debug(f"Retrieved IAM policy for project {self.project_id}")
except exceptions.NotFound as e:
self.logger.error(f"Project {self.project_id} not found: {e}")
raise
except exceptions.PermissionDenied as e:
self.logger.error(f"Permission denied for project {self.project_id}: {e}")
raise
except Exception as e:
self.logger.error(f"An error occurred while retrieving IAM policy for project {self.project_id}: {e}")
raise
members_data = {}
for binding in policy.bindings:
role = binding.role
for member_str in binding.members:
if member_str not in members_data:
username, email_address, member_type = self._parse_member(member_str)
# Skip service accounts not matching the project_id
if member_type == "serviceAccount" and not self.is_project_service_account_email(email_address):
self.logger.debug(f"Skipping service account not matching project_id ({self.project_id}): {email_address}")
continue
if member_type == "unknown":
self.logger.warning(f"Skipping member {member_str} with no email address")
continue # Skip if no email address is found, probably a malformed member
members_data[member_str] = {
"username": username,
"email": email_address,
"member_type": member_type,
"permissions": []
}
# Skip permissions that have a condition
if "withcond" in role:
continue
permission_entry = {}
permission_entry["role"] = role
members_data[member_str]["permissions"].append(permission_entry)
output_list = []
for data in members_data.values():
data["permissions"] = sorted(data["permissions"], key=lambda p: p["role"])
output_list.append({
"username": data["username"],
"email": data["email"],
"member_type": data["member_type"],
"permissions": data["permissions"]
})
output_list.sort(key=lambda x: x["username"])
return output_list
def _read_project_iam_file(self) -> List[Dict]:
"""Reads the IAM policy from a YAML file.
Returns:
A list of dictionaries containing the IAM policy details.
"""
try:
with open(self.users_file, "r") as file:
iam_policy = yaml.safe_load(file)
self.logger.debug(f"Retrieved IAM policy from file for project {self.project_id}")
return iam_policy
except FileNotFoundError:
self.logger.error(f"IAM policy file not found for project {self.project_id}")
return []
except Exception as e:
self.logger.error(f"An error occurred while reading IAM policy file for project {self.project_id}: {e}")
return []
def _to_yaml_file(self, data: List[Dict], output_file: str, header_info: str = "") -> None:
"""
Writes a list of dictionaries to a YAML file.
Include the apache license header on the files
Args:
data: A list of dictionaries containing user permissions and details.
output_file: The file path where the YAML output will be written.
header_info: A string containing the header information to be included in the YAML file.
"""
apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# Prepare the header with the Apache license
header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n"
try:
with open(output_file, "w") as file:
file.write(header)
yaml.dump(data, file, sort_keys=False, default_flow_style=False, indent=2)
self.logger.info(f"Successfully wrote IAM policy data to {output_file}")
except IOError as e:
self.logger.error(f"Failed to write to {output_file}: {e}")
raise
def check_compliance(self) -> List[str]:
"""
Checks the compliance of the IAM policy against the defined policies.
Returns:
A list of strings describing any compliance issues found.
"""
current_users = {user['email']: user for user in self._export_project_iam() if self.is_project_service_account_email(user.get('email'))}
existing_users = {user['email']: user for user in self._read_project_iam_file() if self.is_project_service_account_email(user.get('email'))}
if not existing_users:
error_msg = f"No IAM policy found in the {self.users_file}."
self.logger.info(error_msg)
raise RuntimeError(error_msg)
differences = []
all_emails = set(current_users.keys()) | set(existing_users.keys())
for email in sorted(list(all_emails)):
current_user = current_users.get(email)
existing_user = existing_users.get(email)
if current_user and not existing_user:
differences.append(f"User {email} not found in existing policy.")
elif not current_user and existing_user:
differences.append(f"User {email} found in policy file but not in GCP.")
elif current_user and existing_user:
if current_user.get("member_type") != existing_user.get("member_type"):
differences.append(f"User {email} has different member type. In GCP: {current_user.get('member_type')}, in file: {existing_user.get('member_type')}")
if current_user["permissions"] != existing_user["permissions"]:
msg = f"\nPermissions for user {email} differ."
msg += f"\nIn GCP: {current_user['permissions']}"
msg += f"\nIn {self.users_file}: {existing_user['permissions']}"
self.logger.info(msg)
differences.append(msg)
return differences
def create_announcement(self, recipient: str) -> None:
"""
Creates an announcement about compliance issues using the SendingClient.
Args:
recipient (str): The email address of the announcement recipient.
"""
if not self.sending_client:
raise ValueError("SendingClient is required for creating announcements")
diff = self.check_compliance()
if not diff:
self.logger.info("No compliance issues found, no announcement will be created.")
return
title = f"IAM Policy Non-Compliance Detected"
body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n"
for issue in diff:
body += f"- {issue}\n"
announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."
self.sending_client.create_announcement(title, body, recipient, announcement)
def print_announcement(self, recipient: str) -> None:
"""
Prints announcement details instead of sending them (for testing purposes).
Args:
recipient (str): The email address of the announcement recipient.
"""
if not self.sending_client:
raise ValueError("SendingClient is required for printing announcements")
diff = self.check_compliance()
if not diff:
self.logger.info("No compliance issues found, no announcement will be printed.")
return
title = f"IAM Policy Non-Compliance Detected"
body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n"
for issue in diff:
body += f"- {issue}\n"
announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."
self.sending_client.print_announcement(title, body, recipient, announcement)
def generate_compliance(self) -> None:
"""
Modifies the users file to match the current IAM policy.
If no changes are needed, no file will be written.
"""
try:
diff = self.check_compliance()
except RuntimeError:
self.logger.info("No existing IAM policy found.")
diff = ["No existing policy found"]
if not diff or (len(diff) == 1 and "No existing policy found" not in diff[0]):
self.logger.info("No compliance issues found, no changes will be made.")
return
current_policy = self._export_project_iam()
header_info = f"IAM policy for project {self.project_id}"
self._to_yaml_file(current_policy, self.users_file, header_info)
self.logger.info(f"Generated new compliance file: {self.users_file}")
def config_process() -> Dict[str, str]:
with open(CONFIG_FILE, "r") as file:
config = yaml.safe_load(file)
if not config:
raise ValueError("Configuration file is empty or invalid.")
config_res = dict()
config_res["project_id"] = config.get("project_id", "apache-beam-testing")
config_res["logging_level"] = config.get("logging", {}).get("level", "INFO")
config_res["logging_format"] = config.get("logging", {}).get("format", "[%(asctime)s] %(levelname)s: %(message)s")
config_res["users_file"] = config.get("users_file", "../iam/users.yml")
config_res["action"] = config.get("action", "check")
# SendingClient configuration
config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
return config_res
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="IAM Policy Compliance Checker")
parser.add_argument("--action", choices=["check", "announce", "print", "generate"],
help="Action to perform: check compliance, create announcement, print announcement, or generate new compliance")
args = parser.parse_args()
config = config_process()
# Command line argument takes precedence over config file
action = args.action if args.action else config.get("action", "check")
logging.basicConfig(level=getattr(logging, config["logging_level"].upper(), logging.INFO),
format=config["logging_format"])
logger = logging.getLogger("IAMPolicyComplianceChecker")
# Create SendingClient if needed for announcement actions
sending_client = None
if action in ["announce", "print"]:
try:
# Provide default values for testing, especially for print action
github_token = config["github_token"] or "dummy-token"
github_repo = config["github_repo"] or "dummy/repo"
smtp_server = config["smtp_server"] or "dummy-server"
smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 587
email = config["email"] or "dummy@example.com"
password = config["password"] or "dummy-password"
sending_client = SendingClient(
logger=logger,
github_token=github_token,
github_repo=github_repo,
smtp_server=smtp_server,
smtp_port=smtp_port,
email=email,
password=password
)
except Exception as e:
logger.error(f"Failed to initialize SendingClient: {e}")
return 1
logger.info(f"Starting IAM policy compliance check with action: {action}")
iam_checker = IAMPolicyComplianceChecker(config["project_id"], config["users_file"], logger, sending_client)
try:
if action == "check":
compliance_issues = iam_checker.check_compliance()
if compliance_issues:
logger.warning("IAM policy compliance issues found:")
for issue in compliance_issues:
logger.warning(issue)
else:
logger.info("IAM policy is compliant.")
elif action == "announce":
logger.info("Creating announcement for compliance violations...")
recipient = config["recipient"] or "admin@example.com"
iam_checker.create_announcement(recipient)
elif action == "print":
logger.info("Printing announcement for compliance violations...")
recipient = config["recipient"] or "admin@example.com"
iam_checker.print_announcement(recipient)
elif action == "generate":
logger.info("Generating new compliance based on current IAM policy...")
iam_checker.generate_compliance()
else:
logger.error(f"Unknown action: {action}")
return 1
except Exception as e:
logger.error(f"Error executing action '{action}': {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())