| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import ssl |
| import yaml |
| import logging |
| import smtplib |
| import os |
| from dataclasses import dataclass |
| from datetime import datetime, timedelta, timezone |
| from google.cloud import logging_v2 |
| from google.cloud import storage |
| from typing import List, Dict, Any |
| import argparse |
| |
| REPORT_SUBJECT = "Weekly IAM Security Events Report" |
| REPORT_BODY_TEMPLATE = """ |
| Hello Team, |
| |
| Please find below the summary of IAM security events for the past week: |
| |
| {event_summary} |
| |
| Best Regards, |
| Automated GitHub Action |
| """ |
| |
| @dataclass |
| class SinkCls: |
| name: str |
| description: str |
| filter_methods: List[str] |
| excluded_principals: List[str] |
| |
| class LogAnalyzer(): |
| def __init__(self, project_id: str, gcp_bucket: str, logger: logging.Logger, sinks: List[SinkCls]): |
| self.project_id = project_id |
| self.bucket = gcp_bucket |
| self.logger = logger |
| self.sinks = sinks |
| |
| def _construct_filter(self, sink: SinkCls) -> str: |
| """ |
| Constructs a filter string for a given sink. |
| |
| Args: |
| sink (Sink): The sink object containing filter information. |
| |
| Returns: |
| str: The constructed filter string. |
| """ |
| |
| method_filters = [] |
| for method in sink.filter_methods: |
| method_filters.append(f'protoPayload.methodName="{method}"') |
| |
| exclusion_filters = [] |
| for principal in sink.excluded_principals: |
| exclusion_filters.append(f'protoPayload.authenticationInfo.principalEmail != "{principal}"') |
| |
| if method_filters and exclusion_filters: |
| filter_ = f"({' OR '.join(method_filters)}) AND ({' AND '.join(exclusion_filters)})" |
| elif method_filters: |
| filter_ = f"({' OR '.join(method_filters)})" |
| elif exclusion_filters: |
| filter_ = f"({' AND '.join(exclusion_filters)})" |
| else: |
| filter_ = "" |
| |
| return filter_ |
| |
| def _create_log_sink(self, sink: SinkCls) -> None: |
| """ |
| Creates a log sink in GCP if it doesn't already exist. |
| If it already exists, it updates the sink with the new filter in case the filter has changed. |
| |
| Args: |
| sink (Sink): The sink object to create. |
| """ |
| logging_client = logging_v2.Client(project=self.project_id) |
| filter_ = self._construct_filter(sink) |
| destination = "storage.googleapis.com/{bucket}".format(bucket=self.bucket) |
| |
| sink_client = logging_client.sink(sink.name, filter_=filter_, destination=destination) |
| |
| if sink_client.exists(): |
| self.logger.debug(f"Sink {sink.name} already exists.") |
| sink_client.reload() |
| if sink_client.filter_ != filter_: |
| sink_client.filter_ = filter_ |
| sink_client.update() |
| self.logger.info(f"Updated sink {sink.name}'s filter.") |
| else: |
| sink_client.create() |
| self.logger.info(f"Created sink {sink.name}.") |
| # Reload the sink to get the writer_identity, this may take a few moments |
| sink_client.reload() |
| |
| self._grant_bucket_permissions(sink_client) |
| |
| logging_client.close() |
| |
| def _grant_bucket_permissions(self, sink: logging_v2.Sink) -> None: |
| """ |
| Grants a log sink's writer identity permissions to write to the bucket. |
| """ |
| logging_client = logging_v2.Client(project=self.project_id) |
| storage_client = storage.Client(project=self.project_id) |
| |
| sink.reload() |
| writer_identity = sink.writer_identity |
| if not writer_identity: |
| self.logger.warning(f"Could not retrieve writer identity for sink {sink.name}. " |
| f"Manual permission granting might be required.") |
| return |
| |
| bucket = storage_client.get_bucket(self.bucket) |
| policy = bucket.get_iam_policy(requested_policy_version=3) |
| iam_role = "roles/storage.objectCreator" |
| |
| # Workaround for projects where the writer_identity is not a valid service account. |
| if writer_identity == "serviceAccount:cloud-logs@system.gserviceaccount.com": |
| member = "group:cloud-logs@google.com" |
| else: |
| member = f"serviceAccount:{writer_identity}" |
| |
| # Check if the policy is already configured |
| if any(member in b.get("members", []) and b.get("role") == iam_role for b in policy.bindings): |
| self.logger.debug(f"Sink {sink.name} already has the necessary permissions.") |
| return |
| |
| policy.bindings.append({ |
| "role": iam_role, |
| "members": {member} |
| }) |
| |
| bucket.set_iam_policy(policy) |
| self.logger.info(f"Granted {iam_role} to {member} on bucket {self.bucket} for sink {sink.name}.") |
| |
| def initialize_sinks(self) -> None: |
| for sink in self.sinks: |
| self._create_log_sink(sink) |
| self.logger.info(f"Initialized sink: {sink.name}") |
| |
| def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: |
| """ |
| Reads and retrieves log events from the specified time range from the GCP Cloud Storage bucket. |
| |
| Args: |
| days (int): The number of days to look back for log analysis. |
| |
| Returns: |
| List[Dict[str, Any]]: A list of log entries that match the specified time range. |
| """ |
| found_events = [] |
| storage_client = storage.Client(project=self.project_id) |
| |
| now = datetime.now(timezone.utc) |
| end_time = now.replace(minute=0, second=0, microsecond=0) - timedelta(minutes=30) |
| start_time = end_time - timedelta(days=days) |
| |
| blobs = storage_client.list_blobs(self.bucket) |
| for blob in blobs: |
| if not (start_time <= blob.time_created < end_time): |
| continue |
| |
| self.logger.debug(f"Processing blob: {blob.name}") |
| content = blob.download_as_string().decode("utf-8") |
| |
| for num, line in enumerate(content.splitlines(), 1): |
| try: |
| log_entry = json.loads(line) |
| payload = log_entry.get("protoPayload") |
| if not payload: |
| self.logger.warning(f"Skipping log in blob {blob.name}, line {num}: no protoPayload found.") |
| continue |
| |
| event_details = { |
| "timestamp": log_entry.get("timestamp", "N/A"), |
| "principal": payload.get("authenticationInfo", {}).get("principalEmail", "N/A"), |
| "method": payload.get("methodName", "N/A"), |
| "resource": payload.get("resourceName", "N/A"), |
| "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id", "N/A"), |
| "file_name": blob.name |
| } |
| found_events.append(event_details) |
| except json.JSONDecodeError: |
| self.logger.warning(f"Skipping invalid JSON log in blob {blob.name}, line {num}.") |
| continue |
| |
| storage_client.close() |
| return found_events |
| |
| def create_weekly_email_report(self, dry_run: bool = False) -> None: |
| """ |
| Creates an email report based on the events found this week. |
| If `dry_run` is True, it will print the report to the console instead of sending it. |
| """ |
| events = self.get_event_logs(days=7) |
| if not events: |
| self.logger.info("No events found for the weekly report.") |
| return |
| |
| events.sort(key=lambda x: x['timestamp'], reverse=True) |
| event_summary = "\n".join( |
| f"Timestamp: {event['timestamp']}, Principal: {event['principal']}, Method: {event['method']}, Resource: {event['resource']}, Project ID: {event['project_id']}, File: {event['file_name']}" |
| for event in events |
| ) |
| |
| report_subject = REPORT_SUBJECT |
| report_body = REPORT_BODY_TEMPLATE.format(event_summary=event_summary) |
| |
| if dry_run: |
| self.logger.info("Dry run: printing email report to console.") |
| print(f"Subject: {report_subject}\n") |
| print(f"Body:\n{report_body}") |
| return |
| |
| self.send_email(report_subject, report_body) |
| |
| def send_email(self, subject: str, body: str) -> None: |
| """ |
| Sends an email with the specified subject and body. |
| If email configuration is not fully set, it prints the email instead. |
| |
| Args: |
| subject (str): The subject of the email. |
| body (str): The body of the email. |
| """ |
| smtp_server = os.getenv("SMTP_SERVER") |
| smtp_port_str = os.getenv("SMTP_PORT") |
| recipient = os.getenv("EMAIL_RECIPIENT") |
| email = os.getenv("EMAIL_ADDRESS") |
| password = os.getenv("EMAIL_PASSWORD") |
| |
| if not all([smtp_server, smtp_port_str, recipient, email, password]): |
| self.logger.warning("Email configuration is not fully set. Printing email instead.") |
| print(f"Subject: {subject}\n") |
| print(f"Body:\n{body}") |
| return |
| |
| assert smtp_server is not None |
| assert smtp_port_str is not None |
| assert recipient is not None |
| assert email is not None |
| assert password is not None |
| |
| message = f"Subject: {subject}\n\n{body}" |
| context = ssl.create_default_context() |
| |
| try: |
| smtp_port = int(smtp_port_str) |
| with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server: |
| server.login(email, password) |
| server.sendmail(email, recipient, message) |
| self.logger.info(f"Successfully sent email report to {recipient}") |
| except Exception as e: |
| self.logger.error(f"Failed to send email report: {e}") |
| |
| def load_config_from_yaml(config_path: str) -> Dict[str, Any]: |
| with open(config_path, 'r') as file: |
| config = yaml.safe_load(file) |
| |
| c = { |
| "project_id": config.get("project_id"), |
| "gcp_bucket": config.get("bucket_name"), |
| "sinks": [], |
| "logger": logging.getLogger(__name__) |
| } |
| |
| for sink_config in config.get("sinks", []): |
| sink = SinkCls( |
| name=sink_config["name"], |
| description=sink_config["description"], |
| filter_methods=sink_config.get("filter_methods", []), |
| excluded_principals=sink_config.get("excluded_principals", []) |
| ) |
| c["sinks"].append(sink) |
| |
| logging_config = config.get("logging", {}) |
| log_level = logging_config.get("level", "INFO") |
| log_format = logging_config.get("format", "[%(asctime)s] %(levelname)s: %(message)s") |
| |
| c["logger"].setLevel(log_level) |
| logging.basicConfig(level=log_level, format=log_format) |
| |
| return c |
| |
| def main(): |
| """ |
| Main entry point for the script. |
| """ |
| parser = argparse.ArgumentParser(description="GCP IAM Log Analyzer") |
| parser.add_argument("--config", required=True, help="Path to the configuration YAML file.") |
| |
| subparsers = parser.add_subparsers(dest="command", required=True) |
| |
| subparsers.add_parser("initialize", help="Initialize/update log sinks in GCP.") |
| report_parser = subparsers.add_parser("generate-report", help="Generate and send the weekly IAM security report.") |
| report_parser.add_argument("--dry-run", action="store_true", help="Do not send email, print report to console.") |
| |
| args = parser.parse_args() |
| |
| config = load_config_from_yaml(args.config) |
| log_analyzer = LogAnalyzer( |
| project_id=config["project_id"], |
| gcp_bucket=config["gcp_bucket"], |
| logger=config["logger"], |
| sinks=config["sinks"] |
| ) |
| |
| if args.command == "initialize": |
| log_analyzer.initialize_sinks() |
| log_analyzer.logger.info("Sinks initialized successfully.") |
| elif args.command == "generate-report": |
| log_analyzer.create_weekly_email_report(dry_run=args.dry_run) |
| log_analyzer.logger.info("Weekly report generation process completed.") |
| |
| if __name__ == "__main__": |
| main() |