blob: 370216653267b711873c0c238e15d087d4011642 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import unittest
import time
from unittest import mock
from service_account import ServiceAccountManager
from google.cloud.iam_admin_v1 import types
from google.api_core import exceptions
class TestServiceAccountManagerUnit(unittest.TestCase):
"""Unit tests for ServiceAccountManager with mocked Google Cloud IAM client."""
def setUp(self):
"""Set up test fixtures."""
self.project_id = "test-project-123"
self.test_account_id = "test-service-account"
self.test_display_name = "Test Service Account"
# Patch the IAM client
self.iam_client_patcher = mock.patch('service_account.iam_admin_v1.IAMClient')
self.mock_iam_client_class = self.iam_client_patcher.start()
self.mock_iam_client = self.mock_iam_client_class.return_value
# Create a mock logger
self.mock_logger = mock.MagicMock()
# Create the service account manager
self.manager = ServiceAccountManager(self.project_id, self.mock_logger)
def tearDown(self):
"""Tear down test fixtures."""
self.iam_client_patcher.stop()
def _create_mock_service_account(self, account_id: str, disabled: bool = False) -> types.ServiceAccount:
"""Helper method to create a mock service account."""
mock_account = types.ServiceAccount()
mock_account.name = f"projects/{self.project_id}/serviceAccounts/{account_id}@{self.project_id}.iam.gserviceaccount.com"
mock_account.email = f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
mock_account.display_name = account_id
mock_account.disabled = disabled
mock_account.project_id = self.project_id
mock_account.unique_id = f"123456789{account_id}"
return mock_account
def _create_mock_service_account_key(self, account_id: str, key_id: str = "test-key-id") -> types.ServiceAccountKey:
"""Helper method to create a mock service account key."""
mock_key = types.ServiceAccountKey()
mock_key.name = f"projects/{self.project_id}/serviceAccounts/{account_id}@{self.project_id}.iam.gserviceaccount.com/keys/{key_id}"
mock_key.private_key_data = b'{"type": "service_account", "project_id": "test-project"}'
return mock_key
def test_init(self):
"""Test ServiceAccountManager initialization."""
self.assertEqual(self.manager.project_id, self.project_id)
self.mock_iam_client_class.assert_called_once()
def test_create_service_account_success(self):
"""Test successful service account creation."""
expected_account = self._create_mock_service_account(self.test_account_id)
self.mock_iam_client.create_service_account.return_value = expected_account
with mock.patch.object(self.manager, '_service_account_exists', return_value=True):
result = self.manager.create_service_account(self.test_account_id, self.test_display_name)
self.assertEqual(result, expected_account)
self.mock_iam_client.create_service_account.assert_called_once()
# Verify the request structure
call_args = self.mock_iam_client.create_service_account.call_args
request = call_args[1]['request']
self.assertEqual(request.account_id, self.test_account_id)
self.assertEqual(request.name, f"projects/{self.project_id}")
self.assertEqual(request.service_account.display_name, self.test_display_name)
def test_create_service_account_already_exists(self):
"""Test service account creation when account already exists."""
existing_account = self._create_mock_service_account(self.test_account_id)
# Mock the conflict exception and then successful get
self.mock_iam_client.create_service_account.side_effect = exceptions.Conflict("Account already exists")
self.mock_iam_client.get_service_account.return_value = existing_account
result = self.manager.create_service_account(self.test_account_id, self.test_display_name)
self.assertEqual(result, existing_account)
self.mock_iam_client.create_service_account.assert_called_once()
self.mock_iam_client.get_service_account.assert_called_once()
def test_enable_service_account(self):
"""Test enabling a service account."""
enabled_account = self._create_mock_service_account(self.test_account_id, disabled=False)
with mock.patch.object(self.manager, '_service_account_is_enabled', return_value=True):
self.manager.enable_service_account(self.test_account_id)
self.mock_iam_client.enable_service_account.assert_called_once()
# Verify the request structure
call_args = self.mock_iam_client.enable_service_account.call_args
request = call_args[1]['request']
expected_name = f"projects/{self.project_id}/serviceAccounts/{self.test_account_id}@{self.project_id}.iam.gserviceaccount.com"
self.assertEqual(request.name, expected_name)
def test_disable_service_account(self):
"""Test disabling a service account."""
disabled_account = self._create_mock_service_account(self.test_account_id, disabled=True)
with mock.patch.object(self.manager, '_service_account_is_enabled', return_value=False):
self.manager.disable_service_account(self.test_account_id)
self.mock_iam_client.disable_service_account.assert_called_once()
# Verify the request structure
call_args = self.mock_iam_client.disable_service_account.call_args
request = call_args[1]['request']
expected_name = f"projects/{self.project_id}/serviceAccounts/{self.test_account_id}@{self.project_id}.iam.gserviceaccount.com"
self.assertEqual(request.name, expected_name)
def test_delete_service_account(self):
"""Test deleting a service account."""
with mock.patch.object(self.manager, '_service_account_exists', return_value=False):
self.manager.delete_service_account(self.test_account_id)
self.mock_iam_client.delete_service_account.assert_called_once()
# Verify the request structure
call_args = self.mock_iam_client.delete_service_account.call_args
request = call_args[1]['request']
expected_name = f"projects/{self.project_id}/serviceAccounts/{self.test_account_id}@{self.project_id}.iam.gserviceaccount.com"
self.assertEqual(request.name, expected_name)
def test_list_service_accounts(self):
"""Test listing all service accounts in the project."""
mock_accounts = [
self._create_mock_service_account("account1"),
self._create_mock_service_account("account2", disabled=True),
self._create_mock_service_account("account3"),
]
mock_response = mock.MagicMock()
mock_response.accounts = mock_accounts
# Make the mock response iterable so list(accounts) works
mock_response.__iter__ = lambda self: iter(mock_accounts)
self.mock_iam_client.list_service_accounts.return_value = mock_response
result = self.manager._get_service_accounts()
self.assertEqual(result, mock_accounts)
self.mock_iam_client.list_service_accounts.assert_called_once()
# Verify the request structure
call_args = self.mock_iam_client.list_service_accounts.call_args
request = call_args[1]['request']
self.assertEqual(request.name, f"projects/{self.project_id}")
def test_create_service_account_key_enabled_account(self):
"""Test creating a key for an enabled service account."""
enabled_account = self._create_mock_service_account(self.test_account_id, disabled=False)
mock_key = self._create_mock_service_account_key(self.test_account_id)
self.mock_iam_client.get_service_account.return_value = enabled_account
self.mock_iam_client.create_service_account_key.return_value = mock_key
with mock.patch.object(self.manager, '_service_account_key_exists', return_value=True):
result = self.manager.create_service_account_key(self.test_account_id)
self.assertEqual(result, mock_key)
self.mock_iam_client.get_service_account.assert_called_once()
self.mock_iam_client.create_service_account_key.assert_called_once()
def test_create_service_account_key_disabled_account(self):
"""Test creating a key for a disabled service account."""
disabled_account = self._create_mock_service_account(self.test_account_id, disabled=True)
enabled_account = self._create_mock_service_account(self.test_account_id, disabled=False)
mock_key = self._create_mock_service_account_key(self.test_account_id)
# First call returns disabled account, then we mock the enable flow
self.mock_iam_client.get_service_account.return_value = disabled_account
self.mock_iam_client.create_service_account_key.return_value = mock_key
with mock.patch.object(self.manager, '_service_account_is_enabled', return_value=True), \
mock.patch.object(self.manager, '_service_account_key_exists', return_value=True):
result = self.manager.create_service_account_key(self.test_account_id)
self.assertEqual(result, mock_key)
# Should call get_service_account once to check if it's disabled
self.mock_iam_client.get_service_account.assert_called_once()
self.mock_iam_client.enable_service_account.assert_called_once()
self.mock_iam_client.create_service_account_key.assert_called_once()
def test_create_service_account_key_not_found(self):
"""Test creating a key for a non-existent service account."""
self.mock_iam_client.get_service_account.side_effect = exceptions.NotFound("Account not found")
with self.assertRaises(exceptions.NotFound):
self.manager.create_service_account_key(self.test_account_id)
def test_delete_service_account_key(self):
"""Test deleting a service account key."""
key_id = "test-key-id"
with mock.patch.object(self.manager, '_service_account_key_exists', return_value=False):
self.manager.delete_service_account_key(self.test_account_id, key_id)
self.mock_iam_client.delete_service_account_key.assert_called_once()
def test_list_service_account_keys(self):
"""Test listing service account keys."""
mock_keys = [
self._create_mock_service_account_key(self.test_account_id, "key1"),
self._create_mock_service_account_key(self.test_account_id, "key2"),
]
mock_response = mock.MagicMock()
mock_response.keys = mock_keys
self.mock_iam_client.list_service_account_keys.return_value = mock_response
result = self.manager._get_service_account_keys(self.test_account_id)
self.assertEqual(result, mock_keys)
self.mock_iam_client.list_service_account_keys.assert_called_once()
@mock.patch('service_account.service_account.Credentials.from_service_account_info')
@mock.patch('service_account.Request')
def test_test_service_account_key_valid(self, mock_request_class, mock_credentials_class):
"""Test testing a valid service account key."""
mock_credentials = mock.MagicMock()
mock_credentials_class.return_value = mock_credentials
key_data = b'{"type": "service_account", "project_id": "test-project"}'
result = self.manager.test_service_account_key(key_data)
self.assertTrue(result)
mock_credentials_class.assert_called_once()
mock_credentials.refresh.assert_called_once()
@mock.patch('service_account.service_account.Credentials.from_service_account_info')
def test_test_service_account_key_invalid_json(self, mock_credentials_class):
"""Test testing an invalid JSON service account key."""
key_data = b'invalid json'
result = self.manager.test_service_account_key(key_data)
self.assertFalse(result)
mock_credentials_class.assert_not_called()
@mock.patch('service_account.service_account.Credentials.from_service_account_info')
def test_test_service_account_key_auth_error(self, mock_credentials_class):
"""Test testing a service account key with authentication error."""
mock_credentials = mock.MagicMock()
mock_credentials.refresh.side_effect = Exception("Authentication failed")
mock_credentials_class.return_value = mock_credentials
key_data = b'{"type": "service_account", "project_id": "test-project"}'
result = self.manager.test_service_account_key(key_data)
self.assertFalse(result)
def test_normalize_account_email_with_email(self):
"""Test normalizing account email when input is already a full email."""
full_email = f"{self.test_account_id}@{self.project_id}.iam.gserviceaccount.com"
result = self.manager._normalize_account_email(full_email)
self.assertEqual(result, full_email)
def test_normalize_account_email_with_id(self):
"""Test normalizing account email when input is just the account ID."""
result = self.manager._normalize_account_email(self.test_account_id)
expected_email = f"{self.test_account_id}@{self.project_id}.iam.gserviceaccount.com"
self.assertEqual(result, expected_email)
def test_service_account_exists_true(self):
"""Test _service_account_exists when service account exists."""
mock_account = self._create_mock_service_account(self.test_account_id)
self.mock_iam_client.get_service_account.return_value = mock_account
result = self.manager._service_account_exists(self.test_account_id)
self.assertTrue(result)
self.mock_iam_client.get_service_account.assert_called_once()
def test_service_account_exists_false(self):
"""Test _service_account_exists when service account does not exist."""
self.mock_iam_client.get_service_account.side_effect = exceptions.NotFound("Not found")
result = self.manager._service_account_exists(self.test_account_id)
self.assertFalse(result)
self.mock_iam_client.get_service_account.assert_called_once()
def test_service_account_is_enabled_true(self):
"""Test _service_account_is_enabled when service account is enabled."""
mock_account = self._create_mock_service_account(self.test_account_id, disabled=False)
self.mock_iam_client.get_service_account.return_value = mock_account
result = self.manager._service_account_is_enabled(self.test_account_id)
self.assertTrue(result)
self.mock_iam_client.get_service_account.assert_called_once()
def test_service_account_is_enabled_false(self):
"""Test _service_account_is_enabled when service account is disabled."""
mock_account = self._create_mock_service_account(self.test_account_id, disabled=True)
self.mock_iam_client.get_service_account.return_value = mock_account
result = self.manager._service_account_is_enabled(self.test_account_id)
self.assertFalse(result)
self.mock_iam_client.get_service_account.assert_called_once()
def test_service_account_is_enabled_not_found(self):
"""Test _service_account_is_enabled when service account does not exist."""
self.mock_iam_client.get_service_account.side_effect = exceptions.NotFound("Not found")
result = self.manager._service_account_is_enabled(self.test_account_id)
self.assertFalse(result)
self.mock_iam_client.get_service_account.assert_called_once()
def test_get_service_account_success(self):
"""Test successful retrieval of a service account."""
mock_account = self._create_mock_service_account(self.test_account_id)
self.mock_iam_client.get_service_account.return_value = mock_account
result = self.manager.get_service_account(self.test_account_id)
self.assertEqual(result, mock_account)
self.mock_iam_client.get_service_account.assert_called_once()
def test_get_service_account_not_found(self):
"""Test retrieval of a non-existent service account."""
self.mock_iam_client.get_service_account.side_effect = exceptions.NotFound("Not found")
with self.assertRaises(exceptions.NotFound):
self.manager.get_service_account(self.test_account_id)
self.mock_iam_client.get_service_account.assert_called_once()
def test_service_account_key_exists_true(self):
"""Test _service_account_key_exists when key exists."""
key_id = "test-key-id"
mock_key = self._create_mock_service_account_key(self.test_account_id, key_id)
mock_response = mock.MagicMock()
mock_response.keys = [mock_key]
self.mock_iam_client.list_service_account_keys.return_value = mock_response
result = self.manager._service_account_key_exists(self.test_account_id, key_id)
self.assertTrue(result)
self.mock_iam_client.list_service_account_keys.assert_called_once()
def test_service_account_key_exists_false(self):
"""Test _service_account_key_exists when key does not exist."""
key_id = "test-key-id"
other_key = self._create_mock_service_account_key(self.test_account_id, "other-key-id")
mock_response = mock.MagicMock()
mock_response.keys = [other_key]
self.mock_iam_client.list_service_account_keys.return_value = mock_response
result = self.manager._service_account_key_exists(self.test_account_id, key_id)
self.assertFalse(result)
self.mock_iam_client.list_service_account_keys.assert_called_once()
def test_delete_service_account_key_not_found(self):
"""Test deleting a non-existent service account key."""
key_id = "non-existent-key"
self.mock_iam_client.delete_service_account_key.side_effect = exceptions.NotFound("Key not found")
with self.assertRaises(exceptions.NotFound):
self.manager.delete_service_account_key(self.test_account_id, key_id)
self.mock_iam_client.delete_service_account_key.assert_called_once()
def test_delete_service_account_key_failed_precondition(self):
"""Test deleting a service account key with failed precondition."""
key_id = "test-key-id"
self.mock_iam_client.delete_service_account_key.side_effect = exceptions.FailedPrecondition("Cannot delete")
with self.assertRaises(exceptions.FailedPrecondition):
self.manager.delete_service_account_key(self.test_account_id, key_id)
self.mock_iam_client.delete_service_account_key.assert_called_once()
def test_delete_service_account_key_unexpected_error(self):
"""Test deleting a service account key with unexpected error."""
key_id = "test-key-id"
self.mock_iam_client.delete_service_account_key.side_effect = Exception("Unexpected error")
with self.assertRaises(Exception):
self.manager.delete_service_account_key(self.test_account_id, key_id)
self.mock_iam_client.delete_service_account_key.assert_called_once()
@mock.patch('service_account.time.sleep')
def test_test_service_account_key_retry_success(self, mock_sleep):
"""Test service account key testing with retry logic success."""
mock_credentials = mock.MagicMock()
# First attempt fails, second succeeds
mock_credentials.refresh.side_effect = [Exception("Auth failed"), None]
with mock.patch('service_account.service_account.Credentials.from_service_account_info', return_value=mock_credentials):
key_data = b'{"type": "service_account", "project_id": "test-project"}'
result = self.manager.test_service_account_key(key_data)
self.assertTrue(result)
self.assertEqual(mock_credentials.refresh.call_count, 2)
mock_sleep.assert_called_once_with(2) # delay is doubled before sleep (1 * 2 = 2)
@mock.patch('service_account.time.sleep')
def test_test_service_account_key_retry_exhausted(self, mock_sleep):
"""Test service account key testing when all retries are exhausted."""
mock_credentials = mock.MagicMock()
mock_credentials.refresh.side_effect = Exception("Auth failed")
with mock.patch('service_account.service_account.Credentials.from_service_account_info', return_value=mock_credentials):
key_data = b'{"type": "service_account", "project_id": "test-project"}'
result = self.manager.test_service_account_key(key_data)
self.assertFalse(result)
self.assertEqual(mock_credentials.refresh.call_count, 3) # max_retries
# Sleep is called with 2, then 4 (delay is doubled each time)
self.assertEqual(mock_sleep.call_count, 2) # 2 retry delays
mock_sleep.assert_any_call(2) # First retry delay (1 * 2)
mock_sleep.assert_any_call(4) # Second retry delay (2 * 2)
def test_create_service_account_timeout(self):
"""Test service account creation timeout scenario."""
expected_account = self._create_mock_service_account(self.test_account_id)
self.mock_iam_client.create_service_account.return_value = expected_account
# Mock the helper method to always return False (service account never exists)
with mock.patch.object(self.manager, '_service_account_exists', return_value=False):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.create_service_account(self.test_account_id, self.test_display_name)
def test_enable_service_account_timeout(self):
"""Test service account enabling timeout scenario."""
# Mock the helper method to always return False (service account never gets enabled)
with mock.patch.object(self.manager, '_service_account_is_enabled', return_value=False):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.enable_service_account(self.test_account_id)
def test_disable_service_account_timeout(self):
"""Test service account disabling timeout scenario."""
# Mock the helper method to always return True (service account never gets disabled)
with mock.patch.object(self.manager, '_service_account_is_enabled', return_value=True):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.disable_service_account(self.test_account_id)
def test_delete_service_account_timeout(self):
"""Test service account deletion timeout scenario."""
# Mock the helper method to always return True (service account never gets deleted)
with mock.patch.object(self.manager, '_service_account_exists', return_value=True):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.delete_service_account(self.test_account_id)
def test_create_service_account_key_timeout(self):
"""Test service account key creation timeout scenario."""
enabled_account = self._create_mock_service_account(self.test_account_id, disabled=False)
mock_key = self._create_mock_service_account_key(self.test_account_id)
self.mock_iam_client.get_service_account.return_value = enabled_account
self.mock_iam_client.create_service_account_key.return_value = mock_key
# Mock the helper method to always return False (key never gets created)
with mock.patch.object(self.manager, '_service_account_key_exists', return_value=False):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.create_service_account_key(self.test_account_id)
def test_delete_service_account_key_timeout(self):
"""Test service account key deletion timeout scenario."""
key_id = "test-key-id"
# Mock the helper method to always return True (key never gets deleted)
with mock.patch.object(self.manager, '_service_account_key_exists', return_value=True):
with self.assertRaises(exceptions.DeadlineExceeded):
self.manager.delete_service_account_key(self.test_account_id, key_id)
# Run these real tests just if the environment variables are set correctly
# export GOOGLE_CLOUD_PROJECT = "your-project-id"
# Verify that the variables are set before running the tests
@unittest.skipUnless(
'GOOGLE_CLOUD_PROJECT' in os.environ,
"Skipping tests because environment variables are not set for Google Cloud project."
)
class TestServiceAccountManagerIntegration(unittest.TestCase):
"""Integration tests for ServiceAccountManager with real Google Cloud IAM client."""
def setUp(self):
"""Set up test fixtures."""
self.project_id = os.environ['GOOGLE_CLOUD_PROJECT']
self.logger = logging.getLogger(__name__)
self.manager = ServiceAccountManager(self.project_id, self.logger, 5)
def tearDown(self):
"""Tear down test fixtures."""
# Clean up any service accounts created during tests
try:
accounts = self.manager._get_service_accounts()
for account in accounts:
if account.email.startswith("test-account-"):
try:
self.manager.delete_service_account(account.email)
except Exception as e:
self.logger.warning(f"Failed to delete service account {account.email}: {e}")
except Exception as e:
self.logger.warning(f"Failed to list service accounts during tearDown: {e}")
def test_full_service_account_lifecycle(self):
"""Test creating and deleting a service account."""
account_id = "test-account-" + str(os.getpid())
display_name = "Test Account"
# Create service account
account = self.manager.create_service_account(account_id, display_name)
service_account_email = account.email
self.assertEqual(account.display_name, display_name)
# Wait until service account is created (with retries)
for i in range(5):
if service_account_email in [a.email for a in self.manager._get_service_accounts()]:
break
time.sleep(i ** 2) # Exponential backoff
# Verify service account exists
self.assertIn(service_account_email, [a.email for a in self.manager._get_service_accounts()])
# Create a key for the service account
key = self.manager.create_service_account_key(service_account_email)
self.assertIsNotNone(key.private_key_data)
# Test the key (now includes retry logic for propagation delays)
key_valid = self.manager.test_service_account_key(key.private_key_data)
self.assertTrue(key_valid)
# List keys for the service account - with delayed check
self.assertIn(key.name, [k.name for k in self.manager._get_service_account_keys(service_account_email)])
# Delete the service account key
self.manager.delete_service_account_key(service_account_email, key.name.split('/')[-1])
# Create a new key to ensure we have multiple keys
new_key = self.manager.create_service_account_key(service_account_email)
new_key_valid = self.manager.test_service_account_key(new_key.private_key_data)
self.assertTrue(new_key_valid)
# Verify that we have 2 keys now
all_keys = self.manager._get_service_account_keys(service_account_email)
self.assertEqual(len(all_keys), 2) # 1 old key + 1 new key
# Disable the service account
self.manager.disable_service_account(service_account_email)
# Verify service account is disabled
account = self.manager.get_service_account(service_account_email)
self.assertTrue(account.disabled)
# Enable the service account
self.manager.enable_service_account(service_account_email)
# Verify service account is enabled
account = self.manager.get_service_account(service_account_email)
self.assertFalse(account.disabled)
# Test again the key after enabling the service account
key_valid = self.manager.test_service_account_key(new_key.private_key_data)
self.assertTrue(key_valid)
# Delete the service account
self.manager.delete_service_account(service_account_email)
# Verify service account is deleted - using get_service_account with exception handling
with self.assertRaises(exceptions.NotFound):
self.manager.get_service_account(service_account_email)
if __name__ == '__main__':
# Configure logging to reduce noise during testing
import logging
logging.getLogger('google.cloud').setLevel(logging.WARNING)
logging.getLogger('google.auth').setLevel(logging.WARNING)
# Run the tests
unittest.main()