blob: 4c3a8190d23f32c998a6e61eb94fc6f2f8652de8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import sys
import yaml
import argparse
import os
from typing import List, Dict, TypedDict, Optional
from google.cloud import secretmanager
from google.cloud import iam_admin_v1
from google.cloud.iam_admin_v1 import types
from sending import SendingClient
SECRET_MANAGER_LABEL = "beam-infra-secret-manager"
class AuthorizedUser(TypedDict):
email: str
class ServiceAccount(TypedDict):
account_id: str
display_name: str
authorized_users: List[AuthorizedUser]
class ServiceAccountsConfig(TypedDict):
service_accounts: List[ServiceAccount]
CONFIG_FILE = "config.yml"
class AccountKeysPolicyComplianceCheck:
def __init__(self, project_id: str, service_account_keys_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None):
self.project_id = project_id
self.service_account_keys_file = service_account_keys_file
self.logger = logger
self.sending_client = sending_client
self.secret_client = secretmanager.SecretManagerServiceClient()
self.service_account_client = iam_admin_v1.IAMClient()
def _normalize_account_email(self, account_id: str) -> str:
"""
Normalizes the account identifier to a full email format.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
str: The full service account email address.
"""
if "@" in account_id:
return account_id
else:
return f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
def _denormalize_account_email(self, email: str) -> str:
"""
Denormalizes the full service account email address to its unique identifier.
Args:
email (str): The full service account email address.
Returns:
str: The unique identifier for the service account.
"""
if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"):
return email.split("@")[0]
return email
def _normalize_username(self, username: str) -> str:
"""
Normalizes the username to a consistent format.
Args:
username (str): The username to normalize.
Returns:
str: The normalized username.
"""
if not username.startswith("user:"):
return f"user:{username.strip().lower()}"
return username
def _denormalize_username(self, username: str) -> str:
"""
Denormalizes the username from the consistent format.
Args:
username (str): The normalized username.
Returns:
str: The denormalized username.
"""
if username.startswith("user:"):
return username.split(":", 1)[1].strip().lower()
return username
def _get_all_live_service_accounts(self) -> List[str]:
"""
Retrieves all service accounts that are currently active (not disabled) in the project.
Returns:
List[str]: A list of email addresses for all live service accounts.
"""
request = types.ListServiceAccountsRequest()
request.name = f"projects/{self.project_id}"
try:
accounts = self.service_account_client.list_service_accounts(request=request)
self.logger.debug(f"Retrieved {len(accounts.accounts)} service accounts for project {self.project_id}")
if not accounts:
self.logger.warning(f"No service accounts found in project {self.project_id}.")
return []
return [self._normalize_account_email(account.email) for account in accounts.accounts if not account.disabled]
except Exception as e:
self.logger.error(f"Failed to retrieve service accounts for project {self.project_id}: {e}")
raise
def _get_all_live_managed_secrets(self) -> List[str]:
"""
Retrieves the list of secrets from the Secret Manager that where created by the beam-secret-service
Returns:
List[str]: A list of secret ids
"""
try:
secrets = list(self.secret_client.list_secrets(request={"parent": f"projects/{self.project_id}"}))
self.logger.debug(f"Retrieved {len(secrets)} secrets for project {self.project_id}")
if not secrets:
self.logger.warning(f"No secrets found in project {self.project_id}.")
return []
return [secret.name.split("/")[-1] for secret in secrets if "created_by" in secret.labels and secret.labels["created_by"] == SECRET_MANAGER_LABEL]
except Exception as e:
self.logger.error(f"Failed to retrieve secrets for project {self.project_id}: {e}")
raise
def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]:
"""
Retrieves a list of all users who have access to the secrets in the Secret Manager.
Args:
secret_id (str): The ID of the secret to check access for.
Returns:
List[str]: A list of email addresses for all users authorized to access the secrets.
"""
accessor_role = "roles/secretmanager.secretAccessor"
resource_name = self.secret_client.secret_path(self.project_id, secret_id)
try:
policy = self.secret_client.get_iam_policy(request={"resource": resource_name})
self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}': {policy}")
if not policy.bindings:
self.logger.warning(f"No IAM bindings found for secret '{secret_id}'.")
return []
authorized_users = []
for binding in policy.bindings:
if binding.role == accessor_role:
for user in binding.members:
authorized_users.append(self._normalize_username(user))
return authorized_users
except Exception as e:
self.logger.error(f"Failed to get IAM policy for secret '{secret_id}': {e}")
raise
def _read_service_account_keys(self) -> ServiceAccountsConfig:
"""
Reads the service account keys from a YAML file and returns a list of ServiceAccount objects.
Returns:
List[ServiceAccount]: A list of service account declarations.
"""
try:
with open(self.service_account_keys_file, "r") as file:
keys = yaml.safe_load(file)
if not keys or keys.get("service_accounts") is None:
return {"service_accounts": []}
return keys
except FileNotFoundError:
self.logger.info(f"Service account keys file {self.service_account_keys_file} not found, starting with empty configuration")
return {"service_accounts": []}
except IOError as e:
error_msg = f"Failed to read service account keys from {self.service_account_keys_file}: {e}"
self.logger.error(error_msg)
raise
def _to_yaml_file(self, data: List[ServiceAccount], output_file: str, header_info: str = "") -> None:
"""
Writes a list of dictionaries to a YAML file.
Include the apache license header on the files
Args:
data: A list of dictionaries containing user permissions and details.
output_file: The file path where the YAML output will be written.
header_info: A string containing the header information to be included in the YAML file.
"""
apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# Prepare the header with the Apache license
header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n"
try:
with open(output_file, "w") as file:
file.write(header)
yaml_data = {"service_accounts": data}
yaml.dump(yaml_data, file, sort_keys=False, default_flow_style=False, indent=2)
self.logger.info(f"Successfully wrote Service Account Keys policy data to {output_file}")
except IOError as e:
self.logger.error(f"Failed to write to {output_file}: {e}")
def check_compliance(self) -> List[str]:
"""
Checks the compliance of service account keys with the defined policies.
Returns:
List[str]: A list of compliance issue messages.
"""
service_account_data = self._read_service_account_keys()
file_service_accounts = service_account_data.get("service_accounts")
if not file_service_accounts:
file_service_accounts = []
self.logger.info(f"No service account keys found in the {self.service_account_keys_file}.")
compliance_issues = []
# Check that all service accounts that exist are declared
for service_account in self._get_all_live_service_accounts():
if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]:
msg = f"Service account '{service_account}' is not declared in the service account keys file."
compliance_issues.append(msg)
self.logger.warning(msg)
managed_secrets = self._get_all_live_managed_secrets()
extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts]
# Check for managed secrets that are not declared
for secret in managed_secrets:
if secret not in extracted_secrets:
msg = f"Managed secret '{secret}' is not declared in the service account keys file."
compliance_issues.append(msg)
self.logger.warning(msg)
# Check for each managed secret if it has the correct permissions
for account in file_service_accounts:
secret_name = f"{self._denormalize_account_email(account['account_id'])}-key"
if secret_name not in managed_secrets:
# Skip accounts that don't have managed secrets
continue
authorized_users = [user["email"] for user in account["authorized_users"]]
actual_users = [self._denormalize_username(user) for user in self._get_all_secret_authorized_users(secret_name)]
# Sort both lists for proper comparison
authorized_users.sort()
actual_users.sort()
if authorized_users != actual_users:
msg = f"Managed secret '{account['account_id']}' does not have the correct permissions. Expected: {authorized_users}, Actual: {actual_users}"
compliance_issues.append(msg)
self.logger.warning(msg)
return compliance_issues
def create_announcement(self, recipient: str) -> None:
"""
Creates an announcement about compliance issues using the SendingClient.
Args:
recipient (str): The email address of the announcement recipient.
"""
if not self.sending_client:
raise ValueError("SendingClient is required for creating announcements")
diff = self.check_compliance()
if not diff:
self.logger.info("No compliance issues found, no announcement will be created.")
return
title = f"Account Keys Compliance Issue Detected"
body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n"
for issue in diff:
body += f"- {issue}\n"
announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the Account Keys policy for project {self.project_id}.\n\n"
announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."
self.sending_client.create_announcement(title, body, recipient, announcement)
def print_announcement(self, recipient: str) -> None:
"""
Prints announcement details instead of sending them (for testing purposes).
Args:
recipient (str): The email address of the announcement recipient.
"""
if not self.sending_client:
raise ValueError("SendingClient is required for printing announcements")
diff = self.check_compliance()
if not diff:
self.logger.info("No compliance issues found, no announcement will be printed.")
return
title = f"Account Keys Compliance Issue Detected"
body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n"
for issue in diff:
body += f"- {issue}\n"
announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the Account Keys policy for project {self.project_id}.\n\n"
announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."
self.sending_client.print_announcement(title, body, recipient, announcement)
def generate_compliance(self) -> None:
"""
Modifies the service account keys file to match the current state of service accounts and secrets.
It will just add the non managed service accounts.
"""
service_account_data = self._read_service_account_keys()
file_service_accounts = service_account_data.get("service_accounts", [])
# Ensure file_service_accounts is a list
if file_service_accounts is None:
file_service_accounts = []
self.logger.info(f"Found {len(file_service_accounts)} existing service accounts in the keys file")
# Check that all service accounts that exist are declared, if not, add them
for service_account in self._get_all_live_service_accounts():
if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]:
self.logger.info(f"Service account '{service_account}' is not declared in the service account keys file, adding it")
file_service_accounts.append({
"account_id": self._denormalize_account_email(service_account),
"display_name": service_account,
"authorized_users": []
})
managed_secrets = self._get_all_live_managed_secrets()
extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts]
# Check for managed secrets that are not declared, if not, add them
for secret in managed_secrets:
if secret not in extracted_secrets:
self.logger.info(f"Managed secret '{secret}' is not declared in the service account keys file, adding it")
file_service_accounts.append({
"account_id": secret.strip("-key"),
"display_name": self._normalize_account_email(secret.strip("-key")),
"authorized_users": []
})
# Check for each managed secret if it has the correct permissions
for account in file_service_accounts:
secret_name = f"{self._denormalize_account_email(account['account_id'])}-key"
if secret_name not in managed_secrets:
continue
authorized_users = sorted([user["email"] for user in account["authorized_users"]])
if not authorized_users:
self.logger.info(f"Managed secret '{account}' is new, skipping permission check")
continue
actual_users_normalized = sorted(self._get_all_secret_authorized_users(secret_name))
actual_users = sorted([self._denormalize_username(user) for user in actual_users_normalized])
if authorized_users != actual_users:
self.logger.info(f"Managed secret '{account}' does not have the correct permissions, updating it")
account["authorized_users"] = [{"email": user} for user in actual_users]
# Remove duplicates based on account_id
seen_accounts = set()
deduplicated_accounts = []
for account in file_service_accounts:
if account["account_id"] not in seen_accounts:
seen_accounts.add(account["account_id"])
deduplicated_accounts.append(account)
else:
self.logger.info(f"Removing duplicate entry for account '{account['account_id']}'")
self._to_yaml_file(deduplicated_accounts, self.service_account_keys_file, header_info="Service Account Keys")
def config_process() -> Dict[str, str]:
with open(CONFIG_FILE, "r") as file:
config = yaml.safe_load(file)
if not config:
raise ValueError("Configuration file is empty or invalid.")
config_res = dict()
config_res["project_id"] = config.get("project_id", "apache-beam-testing")
config_res["logging_level"] = config.get("logging", {}).get("level", "INFO")
config_res["logging_format"] = config.get("logging", {}).get("format", "[%(asctime)s] %(levelname)s: %(message)s")
config_res["service_account_keys_file"] = config.get("service_account_keys_file", "../keys/keys.yaml")
config_res["action"] = config.get("action", "check")
# SendingClient configuration
config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
return config_res
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Account Keys Compliance Checker")
parser.add_argument("--action", choices=["check", "announce", "print", "generate"],
help="Action to perform: check compliance, create announcement, print announcement, or generate new compliance")
args = parser.parse_args()
config = config_process()
# Command line argument takes precedence over config file
action = args.action if args.action else config.get("action", "check")
logging.basicConfig(level=getattr(logging, config["logging_level"].upper(), logging.INFO),
format=config["logging_format"])
logger = logging.getLogger("AccountKeysPolicyComplianceCheck")
# Create SendingClient if needed for announcement actions
sending_client = None
if action in ["announce", "print"]:
try:
# Provide default values for testing, especially for print action
github_token = config["github_token"] or "dummy-token"
github_repo = config["github_repo"] or "dummy/repo"
smtp_server = config["smtp_server"] or "dummy-server"
smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 587
email = config["email"] or "dummy@example.com"
password = config["password"] or "dummy-password"
sending_client = SendingClient(
logger=logger,
github_token=github_token,
github_repo=github_repo,
smtp_server=smtp_server,
smtp_port=smtp_port,
email=email,
password=password
)
except Exception as e:
logger.error(f"Failed to initialize SendingClient: {e}")
return 1
logger.info(f"Starting Account Keys policy compliance check with action: {action}")
account_keys_checker = AccountKeysPolicyComplianceCheck(config["project_id"], config["service_account_keys_file"], logger, sending_client)
try:
if action == "check":
compliance_issues = account_keys_checker.check_compliance()
if compliance_issues:
logger.warning("Account Keys policy compliance issues found:")
for issue in compliance_issues:
logger.warning(issue)
else:
logger.info("Account Keys policy is compliant.")
elif action == "announce":
logger.info("Creating announcement for compliance violations...")
recipient = config["recipient"] or "admin@example.com"
account_keys_checker.create_announcement(recipient)
elif action == "print":
logger.info("Printing announcement for compliance violations...")
recipient = config["recipient"] or "admin@example.com"
account_keys_checker.print_announcement(recipient)
elif action == "generate":
logger.info("Generating new compliance based on current Account Keys policy...")
account_keys_checker.generate_compliance()
else:
logger.error(f"Unknown action: {action}")
return 1
except Exception as e:
logger.error(f"Error executing action '{action}': {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())