blob: a1036bf88a47d4f5b4b205ef348ff5909a5b7758 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
import time
from typing import List,Optional
from google.cloud import iam_admin_v1
from google.cloud.iam_admin_v1 import types
from google.oauth2 import service_account
from google.auth.transport.requests import Request
from google.api_core import exceptions
class ServiceAccountManagerLoggerAdapter(logging.LoggerAdapter):
"""Logger adapter that adds a prefix to all log messages."""
def process(self, msg, kwargs):
return f"[ServiceAccountManager] {msg}", kwargs
class ServiceAccountManager:
def __init__(self, project_id: str, logger: logging.Logger, max_retries: int = 3) -> None:
self.project_id = project_id
self.client = iam_admin_v1.IAMClient()
self.logger = ServiceAccountManagerLoggerAdapter(logger, {})
self.max_retries = max_retries
self.logger.info(f"Initialized ServiceAccountManager for project: {self.project_id}")
def _normalize_account_email(self, account_id: str) -> str:
"""
Normalizes the account identifier to a full email format.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
str: The full service account email address.
"""
# Handle both account ID and full email formats
if "@" in account_id and account_id.endswith(".iam.gserviceaccount.com"):
# account_id is already a full email
return account_id
else:
# account_id is just the account name
return f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
def _get_service_accounts(self) -> List[iam_admin_v1.ServiceAccount]:
"""
Retrieves all service accounts in the specified project.
Returns:
List[iam_admin_v1.ServiceAccount]: A list of service account objects.
"""
request = types.ListServiceAccountsRequest()
request.name = f"projects/{self.project_id}"
accounts = self.client.list_service_accounts(request=request)
self.logger.debug(f"Listed service accounts: {[account.email for account in accounts.accounts]}")
return list(accounts.accounts)
def _service_account_exists(self, account_id: str) -> bool:
"""
Checks if a service account with the given account_id exists in the project.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
bool: True if the service account exists, False otherwise.
"""
try:
self.get_service_account(account_id)
return True
except exceptions.NotFound:
return False
def _service_account_is_enabled(self, account_id: str) -> bool:
"""
Checks if a service account is enabled.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
bool: True if the service account is enabled, False otherwise.
"""
try:
service_account = self.get_service_account(account_id)
return not service_account.disabled
except exceptions.NotFound:
self.logger.error(f"Service account {account_id} not found")
return False
def create_service_account(self, account_id: str, display_name: Optional[str] = None) -> types.ServiceAccount:
"""
Creates a service account in the specified project.
If the service account already exists, returns the existing account (idempotent operation).
Args:
account_id (str): The unique identifier for the service account.
display_name (Optional[str]): A human-readable name for the service account.
Returns:
types.ServiceAccount: The created or existing service account object.
"""
request = types.CreateServiceAccountRequest()
request.account_id = account_id
request.name = f"projects/{self.project_id}"
service_account = types.ServiceAccount()
service_account.display_name = display_name or account_id
request.service_account = service_account
try:
account = self.client.create_service_account(request=request)
# Wait for the service account to be created
delay = 1
for _ in range(self.max_retries):
if self._service_account_exists(account_id):
break
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account {account_id} creation timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account {account_id} creation timed out.")
self.logger.info(f"Created service account: {account.email}")
return account
except exceptions.Conflict:
existing_account = self.get_service_account(account_id)
self.logger.info(f"Service account already exists: {existing_account.email}")
return existing_account
def get_service_account(self, account_id: str) -> types.ServiceAccount:
"""
Retrieves a service account by its unique identifier or email.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
types.ServiceAccount: The service account object.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.GetServiceAccountRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
try:
service_account = self.client.get_service_account(request=request)
self.logger.info(f"Retrieved service account: {service_account.email}")
return service_account
except exceptions.NotFound:
self.logger.error(f"Service account {account_id} not found")
raise
def enable_service_account(self, account_id: str) -> None:
"""
Enables a service account in the specified project.
Args:
account_id (str): The unique identifier or email of the service account to enable.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.EnableServiceAccountRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
self.client.enable_service_account(request=request)
# Wait for the service account to be enabled
delay = 1
for _ in range(self.max_retries):
if self._service_account_is_enabled(account_id):
break
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account {account_id} enabling timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account {account_id} enabling timed out.")
self.logger.info(f"Enabled service account: {account_id}")
def disable_service_account(self, account_id: str) -> None:
"""
Disables a service account in the specified project.
Args:
account_id (str): The unique identifier or email of the service account to disable.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.DisableServiceAccountRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
self.client.disable_service_account(request=request)
# Wait for the service account to be disabled
delay = 1
for _ in range(self.max_retries):
if not self._service_account_is_enabled(account_id):
break
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account {account_id} disabling timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account {account_id} disabling timed out.")
self.logger.info(f"Disabled service account: {account_id}")
def delete_service_account(self, account_id: str) -> None:
"""
Deletes a service account in the specified project.
Args:
account_id (str): The unique identifier or email of the service account to delete.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.DeleteServiceAccountRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
self.client.delete_service_account(request=request)
# Wait for the service account to be deleted
delay = 1
for _ in range(self.max_retries):
if not self._service_account_exists(account_id):
break
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account {account_id} deletion timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account {account_id} deletion timed out.")
self.logger.info(f"Deleted service account: {account_id}")
def _get_service_account_keys(self, account_id: str) -> List[iam_admin_v1.ServiceAccountKey]:
"""
Retrieves all keys for the specified service account.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
List[iam_admin_v1.ServiceAccountKey]: A list of service account key objects.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.ListServiceAccountKeysRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
response = self.client.list_service_account_keys(request=request)
self.logger.debug(f"Listed keys for service account: {account_id}")
return list(response.keys)
def _service_account_key_exists(self, account_id: str, key_id: str) -> bool:
"""
Checks if a service account key exists for the specified service account.
Args:
account_id (str): The unique identifier or email of the service account.
key_id (str): The ID of the service account key to check.
Returns:
bool: True if the key exists, False otherwise.
"""
keys = self._get_service_account_keys(account_id)
return any(key.name.split('/')[-1] == key_id for key in keys)
def create_service_account_key(self, account_id: str) -> types.ServiceAccountKey:
"""
Creates a key for the specified service account.
Remember the private key ID is only returned once.
If the service account is disabled, it will be enabled first.
Includes retry logic to handle service account propagation delays.
Args:
account_id (str): The unique identifier or email of the service account.
Returns:
types.ServiceAccountKey: The created service account key object.
str: The private key ID of the created key.
"""
service_account_email = self._normalize_account_email(account_id)
# Retry logic for service account access and key creation
delay = 1
for attempt in range(self.max_retries):
try:
# Check if service account exists and get its state
get_request = types.GetServiceAccountRequest()
get_request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
service_account = self.client.get_service_account(request=get_request)
if service_account.disabled:
self.logger.info(f"Service account {account_id} is disabled. Enabling it first.")
self.enable_service_account(account_id)
# Create the key
request = types.CreateServiceAccountKeyRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
key = self.client.create_service_account_key(request=request)
# Wait for the key to be created and available
key_delay = 1
for _ in range(self.max_retries):
if self._service_account_key_exists(account_id, key.name.split('/')[-1]):
break
time.sleep(key_delay)
key_delay *= 2
else:
self.logger.error(f"Service account key creation for {account_id} timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account key creation for {account_id} timed out.")
self.logger.info(f"Created service account key for {account_id}")
return key
except exceptions.NotFound as e:
if attempt < self.max_retries - 1:
self.logger.warning(f"Service account {account_id} not found (attempt {attempt + 1}/{self.max_retries}), retrying in {delay}s. This may be due to propagation delay.")
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account {account_id} not found after {self.max_retries} attempts")
raise
except Exception as e:
# For other exceptions, don't retry
self.logger.error(f"Error creating service account key for {account_id}: {e}")
raise
# This should not be reached due to the raise in the except block
raise exceptions.NotFound(f"Service account {account_id} not found after {self.max_retries} attempts")
def delete_service_account_key(self, account_id: str, key_id: str) -> None:
"""
Deletes a key for the specified service account.
Args:
account_id (str): The unique identifier or email of the service account.
key_id (str): The ID of the key to delete.
Raises:
exceptions.NotFound: If the key does not exist.
exceptions.FailedPrecondition: If the key cannot be deleted due to constraints.
"""
service_account_email = self._normalize_account_email(account_id)
request = types.DeleteServiceAccountKeyRequest()
request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}/keys/{key_id}"
try:
self.client.delete_service_account_key(request=request)
except exceptions.NotFound:
self.logger.warning(f"Service account key {key_id} not found for account: {account_id} (may have been already deleted)")
raise
except exceptions.FailedPrecondition as e:
self.logger.warning(f"Failed to delete service account key {key_id} for account: {account_id}. Error: {e}")
raise
except Exception as e:
self.logger.error(f"Unexpected error deleting service account key {key_id} for account: {account_id}. Error: {e}")
raise
# Wait for the key to be deleted
delay = 1
for _ in range(self.max_retries):
if not self._service_account_key_exists(account_id, key_id):
break
time.sleep(delay)
delay *= 2
else:
self.logger.error(f"Service account key deletion for {account_id} timed out after {self.max_retries} retries.")
raise exceptions.DeadlineExceeded(f"Service account key deletion for {account_id} timed out.")
self.logger.info(f"Deleted service account key: {key_id} for account: {account_id}")
def test_service_account_key(self, key_data: bytes) -> bool:
"""
Tests if a service account key is valid by attempting to authenticate and make an API call.
Includes retry logic to handle key propagation delays.
Args:
key_data (bytes): The private key data from the service account key.
Returns:
bool: True if the key is valid and can authenticate, False otherwise.
"""
try:
key_info = json.loads(key_data.decode('utf-8'))
except json.JSONDecodeError as json_error:
self.logger.error(f"Invalid JSON in service account key: {json_error}")
return False
delay = 1
for attempt in range(self.max_retries):
try:
credentials = service_account.Credentials.from_service_account_info(
key_info,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
request = Request()
credentials.refresh(request)
self.logger.info(f"Service account key is valid and can authenticate")
return True
except Exception as auth_error:
if attempt < self.max_retries - 1: # Don't log on the last attempt
delay *= 2
self.logger.warning(f"Authentication attempt {attempt + 1} failed (will retry in {delay}s): {auth_error}")
time.sleep(delay)
else:
self.logger.error(f"Authentication failed with service account key after {self.max_retries} attempts: {auth_error}")
return False
return False