blob: c53fbc1a44dbad24086b9956c33a3f7af8ae6ea3 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import unittest
import io
import sys
from unittest import mock
from stale_cleaner import (
GoogleCloudResource,
StaleCleaner,
PubSubTopicCleaner,
PubSubSubscriptionCleaner,
FakeClock,
DEFAULT_TIME_THRESHOLD,
PUBSUB_TOPIC_RESOURCE,
STORAGE_PREFIX
)
class SilencedMock(mock.MagicMock):
"""A MagicMock that doesn't print anything when called."""
def __call__(self, *args, **kwargs):
with mock.patch('sys.stdout', new=io.StringIO()):
with mock.patch('sys.stderr', new=io.StringIO()):
return super(SilencedMock, self).__call__(*args, **kwargs)
# Use this context manager to silence print statements
class SilencePrint:
"""Context manager to silence print statements."""
def __enter__(self):
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
class GoogleCloudResourceTest(unittest.TestCase):
"""Tests for the GoogleCloudResource class."""
def test_init_with_default_values(self):
"""Test initialization with default values."""
test_time_str = "2025-05-10T12:00:00"
fake_clock = FakeClock(test_time_str)
resource = GoogleCloudResource(resource_name="test-resource", clock=fake_clock)
self.assertEqual(resource.resource_name, "test-resource")
self.assertEqual(resource.creation_date, fake_clock())
self.assertEqual(resource.last_update_date, fake_clock())
def test_init_with_custom_values(self):
"""Test initialization with custom values."""
creation_date = datetime.datetime(2025, 1, 1, 12, 0, 0)
update_date = datetime.datetime(2025, 5, 1, 12, 0, 0)
fake_clock = FakeClock("2025-05-10T12:00:00")
resource = GoogleCloudResource(
resource_name="test-resource",
creation_date=creation_date,
last_update_date=update_date,
clock=fake_clock
)
self.assertEqual(resource.resource_name, "test-resource")
self.assertEqual(resource.creation_date, creation_date)
self.assertEqual(resource.last_update_date, update_date)
def test_str_representation(self):
"""Test string representation of the resource."""
resource = GoogleCloudResource(resource_name="test-resource")
self.assertEqual(str(resource), "test-resource")
def test_to_dict(self):
"""Test conversion to dictionary."""
creation_date = datetime.datetime(2025, 1, 1, 12, 0, 0)
update_date = datetime.datetime(2025, 5, 1, 12, 0, 0)
resource = GoogleCloudResource(
resource_name="test-resource",
creation_date=creation_date,
last_update_date=update_date
)
expected_dict = {
"resource_name": "test-resource",
"creation_date": "2025-01-01T12:00:00",
"last_update_date": "2025-05-01T12:00:00"
}
self.assertEqual(resource.to_dict(), expected_dict)
def test_update(self):
"""Test update method."""
creation_date = datetime.datetime(2025, 1, 1, 12, 0, 0)
initial_update_date = datetime.datetime(2025, 5, 1, 12, 0, 0)
fake_clock_initial = FakeClock("2025-01-01T12:00:00")
resource = GoogleCloudResource(
resource_name="test-resource",
creation_date=creation_date,
last_update_date=initial_update_date,
clock=fake_clock_initial
)
self.assertEqual(resource.last_update_date, initial_update_date)
# Test with provided clock
fake_clock_new_update = FakeClock("2025-05-10T12:00:00")
resource.update(clock=fake_clock_new_update)
self.assertEqual(resource.last_update_date, fake_clock_new_update())
current_last_update = resource.last_update_date
resource.update()
self.assertNotEqual(resource.last_update_date, current_last_update)
def test_time_alive(self):
"""Test time_alive method."""
creation_time_str = "2025-01-01T12:00:00"
current_time_str = "2025-01-02T12:00:00" # 1 day later
fake_creation_clock = FakeClock(creation_time_str)
resource = GoogleCloudResource(
resource_name="test-resource",
clock=fake_creation_clock
)
# Ensure creation_date is set by the clock during init
self.assertEqual(resource.creation_date, fake_creation_clock())
# Test with provided clock
fake_current_clock = FakeClock(current_time_str)
self.assertEqual(resource.time_alive(clock=fake_current_clock), 86400) # 1 day in seconds
class MockStaleCleaner(StaleCleaner):
"""A mock implementation of StaleCleaner for testing."""
def __init__(self, project_id, resource_type, bucket_name, prefixes=None,
time_threshold=DEFAULT_TIME_THRESHOLD, active_resources=None,
stored_resources=None, clock=None):
super().__init__(project_id, resource_type, bucket_name, prefixes, time_threshold, clock)
self._active_resources_mock_data = active_resources or {}
self._stored_resources_mock_data = stored_resources or {}
self.deleted_resources = []
def _active_resources(self):
"""Override the private method to return mock data."""
processed_active_resources = {}
for k, v in self._active_resources_mock_data.items():
if not isinstance(v, GoogleCloudResource):
processed_active_resources[k] = GoogleCloudResource(resource_name=k, clock=self.clock)
else:
processed_active_resources[k] = v
return processed_active_resources
def _stored_resources(self):
"""Override the private method to return mock data."""
processed_stored_resources = {}
for k, v_data in self._stored_resources_mock_data.items():
if isinstance(v_data, GoogleCloudResource):
processed_stored_resources[k] = v_data
elif isinstance(v_data, dict) and "resource_name" in v_data and "creation_date" in v_data and "last_update_date" in v_data:
processed_stored_resources[k] = GoogleCloudResource(
resource_name=v_data["resource_name"],
creation_date=datetime.datetime.fromisoformat(v_data["creation_date"]),
last_update_date=datetime.datetime.fromisoformat(v_data["last_update_date"]),
clock=self.clock # Ensure clock is passed if re-hydrating
)
else:
processed_stored_resources[k] = GoogleCloudResource(resource_name=k, clock=self.clock)
return processed_stored_resources
def _write_resources(self, resources):
"""Override the private method to store resources."""
self._stored_resources_mock_data = resources
def _delete_resource(self, resource_name):
"""Override the private method to record deleted resources."""
self.deleted_resources.append(resource_name)
class StaleCleanerTest(unittest.TestCase):
"""Tests for the StaleCleaner class."""
def setUp(self):
"""Set up test fixtures."""
self.project_id = "test-project"
self.resource_type = "test-resource"
self.bucket_name = "test-bucket"
self.prefixes = ["test-prefix"]
self.time_threshold = 3600 # 1 hour
# Create resources with different ages
self.fake_clock = FakeClock("2025-05-10T12:00:00")
# Resource that's fresh (30 minutes old)
fresh_resource_time = FakeClock("2025-05-10T11:30:00")()
self.fresh_resource = GoogleCloudResource(
resource_name="fresh-resource",
creation_date=fresh_resource_time,
last_update_date=fresh_resource_time,
clock=self.fake_clock
)
# Resource that's stale (2 hours old)
stale_resource_time = FakeClock("2025-05-10T10:00:00")()
self.stale_resource = GoogleCloudResource(
resource_name="stale-resource",
creation_date=stale_resource_time,
last_update_date=stale_resource_time,
clock=self.fake_clock
)
# Active resources in GCP
self.active_resources_data = {
"fresh-resource": self.fresh_resource,
"stale-resource": self.stale_resource,
"new-resource": GoogleCloudResource(
resource_name="new-resource",
clock=self.fake_clock
)
}
# Stored resources
self.stored_resources_data = {
"fresh-resource": self.fresh_resource,
"stale-resource": self.stale_resource,
"deleted-resource": GoogleCloudResource(
resource_name="deleted-resource",
creation_date=stale_resource_time,
last_update_date=stale_resource_time,
clock=self.fake_clock
)
}
self.cleaner = MockStaleCleaner(
project_id=self.project_id,
resource_type=self.resource_type,
bucket_name=self.bucket_name,
prefixes=self.prefixes,
time_threshold=self.time_threshold,
active_resources=self.active_resources_data,
stored_resources=self.stored_resources_data,
clock=self.fake_clock
)
def test_init(self):
"""Test initialization."""
self.assertEqual(self.cleaner.project_id, self.project_id)
self.assertEqual(self.cleaner.resource_type, self.resource_type)
self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
self.assertEqual(self.cleaner.prefixes, self.prefixes)
self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
def test_refresh(self):
"""Test refresh method."""
# Use SilencePrint to suppress print statements during test
with SilencePrint():
self.cleaner.refresh()
# Check that deleted-resource was removed
self.assertNotIn("deleted-resource", self.cleaner._stored_resources_mock_data)
# Check that new-resource was added
self.assertIn("new-resource", self.cleaner._stored_resources_mock_data)
# Verify its creation and update times are from the fake_clock
self.assertEqual(self.cleaner._stored_resources_mock_data["new-resource"].creation_date, self.fake_clock())
self.assertEqual(self.cleaner._stored_resources_mock_data["new-resource"].last_update_date, self.fake_clock())
# Check that fresh-resource was updated
self.assertEqual(
self.cleaner._stored_resources_mock_data["fresh-resource"].last_update_date,
self.fake_clock()
)
# Check that stale-resource is still present (refresh doesn't delete, just updates times)
self.assertIn("stale-resource", self.cleaner._stored_resources_mock_data)
self.assertEqual(
self.cleaner._stored_resources_mock_data["stale-resource"].last_update_date,
self.fake_clock()
)
def test_refresh_with_time_advance(self):
"""Test refresh method when the clock time has advanced."""
initial_time_str = "2025-05-10T12:00:00"
advanced_time_str = "2025-05-10T13:00:00" # 1 hour later
# Setup cleaner with initial clock
tmp_clock = FakeClock(initial_time_str)
resource_to_update = GoogleCloudResource(
resource_name="resource-to-update",
clock=tmp_clock
)
cleaner = MockStaleCleaner(
project_id=self.project_id,
resource_type=self.resource_type,
bucket_name=self.bucket_name,
active_resources={"resource-to-update": resource_to_update},
stored_resources={"resource-to-update": resource_to_update},
clock=tmp_clock
)
# Verify initial last_update_date
self.assertEqual(
cleaner._stored_resources_mock_data["resource-to-update"].last_update_date, tmp_clock()
)
# Advance the cleaner's clock
tmp_clock.set(advanced_time_str)
# Use SilencePrint to suppress print statements during test
with SilencePrint():
cleaner.refresh()
# Check that the resource's last_update_date was updated to the new clock time
self.assertIn("resource-to-update", cleaner._stored_resources_mock_data)
self.assertEqual(
cleaner._stored_resources_mock_data["resource-to-update"].last_update_date,
cleaner.clock()
)
self.assertEqual(
cleaner._stored_resources_mock_data["resource-to-update"].last_update_date,
datetime.datetime.fromisoformat(advanced_time_str)
)
self.assertNotEqual(
cleaner._stored_resources_mock_data["resource-to-update"].last_update_date,
datetime.datetime.fromisoformat(initial_time_str)
)
def test_stale_resources(self):
"""Test stale_resources method."""
stale = self.cleaner.stale_resources()
self.assertEqual(len(stale), 2)
self.assertIn("stale-resource", stale)
self.assertIn("deleted-resource", stale)
self.assertNotIn("fresh-resource", stale)
self.assertNotIn("new-resource", stale)
def test_fresh_resources(self):
"""Test fresh_resources method."""
fresh = self.cleaner.fresh_resources()
self.assertEqual(len(fresh), 1)
self.assertIn("fresh-resource", fresh)
self.assertNotIn("stale-resource", fresh)
self.assertNotIn("deleted-resource", fresh)
def test_delete_stale_dry_run(self):
"""Test delete_stale method with dry_run=True."""
with SilencePrint():
self.cleaner.delete_stale(dry_run=True)
# Check that no resources were actually recorded as deleted by MockStaleCleaner
self.assertEqual(len(self.cleaner.deleted_resources), 0)
def test_delete_stale(self):
"""Test delete_stale method with dry_run=False."""
self.cleaner._active_resources_mock_data = {
"fresh-resource": self.fresh_resource,
"stale-resource": self.stale_resource, # Stale and active
"new-resource": GoogleCloudResource(resource_name="new-resource", clock=self.fake_clock)
}
with SilencePrint():
self.cleaner.delete_stale(dry_run=False)
# Check that only stale and active resources were "deleted"
self.assertEqual(len(self.cleaner.deleted_resources), 1)
self.assertIn("stale-resource", self.cleaner.deleted_resources)
self.assertNotIn("deleted-resource", self.cleaner.deleted_resources)
class PubSubSubscriptionCleanerTest(unittest.TestCase):
"""Tests for the PubSubSubscriptionCleaner class."""
def setUp(self):
"""Set up test fixtures."""
self.project_id = "test-project"
self.bucket_name = "test-bucket"
self.prefixes = ["test-prefix"]
self.time_threshold = 86400 # 1 day
self.fake_clock = FakeClock("2025-05-28T10:00:00")
# Mock PubSub client
self.mock_client_patcher = mock.patch('google.cloud.pubsub_v1.SubscriberClient')
self.MockSubscriberClientClass = self.mock_client_patcher.start()
self.mock_subscriber_client = self.MockSubscriberClientClass.return_value
# Create a test cleaner
self.cleaner = PubSubSubscriptionCleaner(
project_id=self.project_id,
bucket_name=self.bucket_name,
prefixes=self.prefixes,
time_threshold=self.time_threshold,
clock=self.fake_clock
)
self.cleaner._write_resources = SilencedMock()
self.cleaner._stored_resources = SilencedMock(return_value={})
def tearDown(self):
"""Tear down test fixtures."""
self.mock_client_patcher.stop()
def test_init(self):
"""Test initialization."""
self.assertEqual(self.cleaner.project_id, self.project_id)
self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
self.assertEqual(self.cleaner.prefixes, self.prefixes)
self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
self.assertIsInstance(self.cleaner.clock, FakeClock)
def test_active_resources(self):
"""Test _active_resources method."""
# Mock subscriptions
sub1 = mock.Mock()
sub1.name = "projects/test-project/subscriptions/test-prefix-sub1"
sub1.topic = "projects/test-project/topics/some-topic"
sub2 = mock.Mock()
sub2.name = "projects/test-project/subscriptions/test-prefix-sub2-detached"
sub2.topic = "_deleted-topic_"
sub3 = mock.Mock()
sub3.name = "projects/test-project/subscriptions/other-prefix-sub3"
sub3.topic = "projects/test-project/topics/another-topic"
self.mock_subscriber_client.list_subscriptions.return_value = [sub1, sub2, sub3]
with SilencePrint():
active = self.cleaner._active_resources()
self.assertIn("projects/test-project/subscriptions/test-prefix-sub1", active)
self.assertIn("projects/test-project/subscriptions/test-prefix-sub2-detached", active)
self.assertNotIn("projects/test-project/subscriptions/other-prefix-sub3", active)
self.assertEqual(len(active), 2)
def test_delete_resource(self):
"""Test _delete_resource method."""
sub_name = "test-sub-to-delete"
subscription_path = f"projects/{self.project_id}/subscriptions/{sub_name}"
self.mock_subscriber_client.subscription_path.return_value = subscription_path
with SilencePrint():
self.cleaner._delete_resource(sub_name)
self.mock_subscriber_client.subscription_path.assert_called_once_with(self.project_id, sub_name)
self.mock_subscriber_client.delete_subscription.assert_called_once_with(
request={'subscription': subscription_path}
)
class PubSubTopicCleanerTest(unittest.TestCase):
"""Tests for the PubSubTopicCleaner class."""
def setUp(self):
"""Set up test fixtures."""
self.project_id = "test-project"
self.bucket_name = "test-bucket"
self.prefixes = ["test-prefix"]
self.time_threshold = 86400 # 1 day
self.fake_clock = FakeClock("2025-05-28T10:00:00")
# Mock PubSub client
self.mock_client_patcher = mock.patch('google.cloud.pubsub_v1.PublisherClient')
self.MockPublisherClientClass = self.mock_client_patcher.start()
# Create a test cleaner
self.cleaner = PubSubTopicCleaner(
project_id=self.project_id,
bucket_name=self.bucket_name,
prefixes=self.prefixes,
time_threshold=self.time_threshold,
clock=self.fake_clock
)
self.cleaner._write_resources = SilencedMock()
self.cleaner._stored_resources = SilencedMock(return_value={})
def tearDown(self):
"""Tear down test fixtures."""
self.mock_client_patcher.stop()
def test_init(self):
"""Test initialization."""
self.assertEqual(self.cleaner.project_id, self.project_id)
self.assertEqual(self.cleaner.resource_type, PUBSUB_TOPIC_RESOURCE)
self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
self.assertEqual(self.cleaner.prefixes, self.prefixes)
self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
self.assertEqual(self.cleaner.clock, self.fake_clock)
self.MockPublisherClientClass.assert_called_once()
def test_active_resources_with_matching_prefix(self):
"""Test __active_resources method with matching prefix."""
# Configure mock client to return topics
mock_topic1 = mock.MagicMock()
mock_topic1.name = "projects/test-project/topics/test-prefix-topic1"
mock_topic2 = mock.MagicMock()
mock_topic2.name = "projects/test-project/topics/other-topic"
self.cleaner.client.list_topics.return_value = [mock_topic1, mock_topic2]
# Call the private method using name mangling
resources = self.cleaner._active_resources()
# Should only return the topic with matching prefix
self.assertEqual(len(resources), 1)
self.assertIn("projects/test-project/topics/test-prefix-topic1", resources)
self.assertNotIn("projects/test-project/topics/other-topic", resources)
def test_active_resources_without_prefix_filtering(self):
"""Test __active_resources method without prefix filtering."""
# Configure cleaner with no prefixes
self.cleaner.prefixes = []
# Configure mock client to return topics
mock_topic1 = mock.MagicMock()
mock_topic1.name = "projects/test-project/topics/test-prefix-topic1"
mock_topic2 = mock.MagicMock()
mock_topic2.name = "projects/test-project/topics/other-topic"
self.cleaner.client.list_topics.return_value = [mock_topic1, mock_topic2]
# Call the private method using name mangling
resources = self.cleaner._active_resources()
# Should return all topics
self.assertEqual(len(resources), 2)
self.assertIn("projects/test-project/topics/test-prefix-topic1", resources)
self.assertIn("projects/test-project/topics/other-topic", resources)
def test_delete_resource(self):
"""Test __delete_resource method."""
# Create a resource name
resource_name = "projects/test-project/topics/test-topic"
# Set the clock to a specific time
self.cleaner.clock.set("2025-05-28T10:00:00")
# Call the private method using name mangling
with mock.patch('builtins.print') as mock_print:
self.cleaner._delete_resource(resource_name)
# Check that delete_topic was called
self.cleaner.client.delete_topic.assert_called_once_with(request={'topic': resource_name})
# Check that correct message was printed
mock_print.assert_called_once_with(f"{self.cleaner.clock()} - Deleting PubSub topic {resource_name}")
if __name__ == '__main__':
unittest.main()