KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito (#15106)
This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 40de78f..7b8356b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -112,9 +112,9 @@
@org.mockito.Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @org.mockito.Mock
private ProcessorStateManager activeStateManager;
- @Mock(type = MockType.NICE)
+ @org.mockito.Mock
private ProcessorStateManager standbyStateManager;
@Mock(type = MockType.NICE)
private StateStoreMetadata storeMetadata;
@@ -172,13 +172,18 @@
when(stateManager.taskType()).thenReturn(type);
}
+ private void setupActiveStateManager() {
+ when(activeStateManager.storeMetadata(tp)).thenReturn(storeMetadata);
+ when(activeStateManager.taskType()).thenReturn(ACTIVE);
+ }
+
+ private void setupStandbyStateManager() {
+ when(standbyStateManager.storeMetadata(tp)).thenReturn(storeMetadata);
+ when(standbyStateManager.taskType()).thenReturn(STANDBY);
+ }
+
@Before
public void setUp() {
- EasyMock.expect(activeStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
- EasyMock.expect(activeStateManager.taskType()).andReturn(ACTIVE).anyTimes();
- EasyMock.expect(standbyStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
- EasyMock.expect(standbyStateManager.taskType()).andReturn(STANDBY).anyTimes();
-
EasyMock.expect(storeMetadata.changelogPartition()).andReturn(tp).anyTimes();
EasyMock.expect(storeMetadata.store()).andReturn(store).anyTimes();
EasyMock.expect(store.name()).andReturn(storeName).anyTimes();
@@ -187,8 +192,6 @@
@After
public void tearDown() {
EasyMock.reset(
- activeStateManager,
- standbyStateManager,
storeMetadata,
storeMetadataOne,
storeMetadataTwo,
@@ -642,11 +645,12 @@
@Test
public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
+ setupActiveStateManager();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
- EasyMock.replay(mockTasks, activeStateManager, storeMetadata, store);
+ EasyMock.replay(mockTasks, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
@@ -670,6 +674,7 @@
@Test
public void shouldRequestPositionAndHandleTimeoutException() {
+ setupActiveStateManager();
final TaskId taskId = new TaskId(0, 0);
final Task mockTask = mock(Task.class);
@@ -679,9 +684,9 @@
mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
EasyMock.expectLastCall();
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
- EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(mockTask, activeStateManager, storeMetadata, store);
+ when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L));
+ when(activeStateManager.taskId()).thenReturn(taskId);
+ EasyMock.replay(mockTask, storeMetadata, store);
final AtomicBoolean clearException = new AtomicBoolean(false);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@@ -724,10 +729,11 @@
@Test
public void shouldThrowIfPositionFail() {
+ setupActiveStateManager();
final TaskId taskId = new TaskId(0, 0);
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId);
+ when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
- EasyMock.replay(activeStateManager, storeMetadata, store);
+ EasyMock.replay(storeMetadata, store);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
@@ -752,6 +758,7 @@
@Test
public void shouldRequestEndOffsetsAndHandleTimeoutException() {
+ setupActiveStateManager();
final TaskId taskId = new TaskId(0, 0);
final Task mockTask = niceMock(Task.class);
@@ -759,9 +766,9 @@
EasyMock.expectLastCall();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
- EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(mockTask, activeStateManager, storeMetadata, store);
+ when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
+ when(activeStateManager.taskId()).thenReturn(taskId);
+ EasyMock.replay(mockTask, storeMetadata, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false);
@@ -813,11 +820,12 @@
@Test
public void shouldThrowIfEndOffsetsFail() {
+ setupActiveStateManager();
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(activeStateManager, storeMetadata, store);
+ when(activeStateManager.taskId()).thenReturn(taskId);
+ EasyMock.replay(storeMetadata, store);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
@@ -963,12 +971,13 @@
@Test
public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
+ setupStandbyStateManager();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
- EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+ EasyMock.replay(mockTasks, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
changelogReader.register(tp, standbyStateManager);
@@ -1004,14 +1013,15 @@
@Test
public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
+ setupStandbyStateManager();
final Map<TaskId, Task> mockTasks = mock(Map.class);
//EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
- EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes();
- EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+ when(standbyStateManager.changelogAsSource(tp)).thenReturn(false);
+ EasyMock.replay(mockTasks, storeMetadata, store);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
@@ -1061,13 +1071,14 @@
@Test
public void shouldRestoreToLimitInStandbyState() {
+ setupStandbyStateManager();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
- EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes();
+ when(standbyStateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
- EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+ EasyMock.replay(mockTasks, storeMetadata, store);
final long now = time.milliseconds();
final Properties properties = new Properties();
@@ -1176,6 +1187,7 @@
@Test
public void shouldRestoreMultipleChangelogs() {
+ setupActiveStateManager();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
@@ -1186,14 +1198,14 @@
EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes();
EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
- EasyMock.expect(activeStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
- EasyMock.expect(activeStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
- EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(mkMap(
+ when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
+ when(activeStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
+ when(activeStateManager.changelogOffsets()).thenReturn(mkMap(
mkEntry(tp, 5L),
mkEntry(tp1, 5L),
mkEntry(tp2, 5L)
- )).anyTimes();
- EasyMock.replay(mockTasks, activeStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
+ ));
+ EasyMock.replay(mockTasks, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
setupConsumer(10, tp);
setupConsumer(5, tp1);
@@ -1225,6 +1237,8 @@
@Test
public void shouldTransitState() {
+ setupActiveStateManager();
+ when(standbyStateManager.taskType()).thenReturn(STANDBY);
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
@@ -1233,12 +1247,12 @@
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
- EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
- EasyMock.expect(standbyStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
- EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.expect(standbyStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(activeStateManager, standbyStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
+ when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
+ when(standbyStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
+ when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
+ when(activeStateManager.taskId()).thenReturn(taskId);
+ when(standbyStateManager.taskId()).thenReturn(taskId);
+ EasyMock.replay(storeMetadata, store, storeMetadataOne, storeMetadataTwo);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L));
@@ -1304,11 +1318,12 @@
@Test
public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
+ when(standbyStateManager.taskType()).thenReturn(STANDBY);
final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);
- EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
+ when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
- EasyMock.replay(standbyStateManager, store, storeMetadataOne);
+ EasyMock.replay(store, storeMetadataOne);
changelogReader.register(tp1, standbyStateManager);
changelogReader.transitToUpdateStandby();
@@ -1319,11 +1334,12 @@
@Test
public void shouldThrowIfRestoreCallbackThrows() {
+ setupActiveStateManager();
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
- EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(activeStateManager, storeMetadata, store);
+ when(activeStateManager.taskId()).thenReturn(taskId);
+ EasyMock.replay(storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));