KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito (#15106)

This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 40de78f..7b8356b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -112,9 +112,9 @@
 
     @org.mockito.Mock
     private ProcessorStateManager stateManager;
-    @Mock(type = MockType.NICE)
+    @org.mockito.Mock
     private ProcessorStateManager activeStateManager;
-    @Mock(type = MockType.NICE)
+    @org.mockito.Mock
     private ProcessorStateManager standbyStateManager;
     @Mock(type = MockType.NICE)
     private StateStoreMetadata storeMetadata;
@@ -172,13 +172,18 @@
         when(stateManager.taskType()).thenReturn(type);
     }
 
+    private void setupActiveStateManager() {
+        when(activeStateManager.storeMetadata(tp)).thenReturn(storeMetadata);
+        when(activeStateManager.taskType()).thenReturn(ACTIVE);
+    }
+
+    private void setupStandbyStateManager() {
+        when(standbyStateManager.storeMetadata(tp)).thenReturn(storeMetadata);
+        when(standbyStateManager.taskType()).thenReturn(STANDBY);
+    }
+
     @Before
     public void setUp() {
-        EasyMock.expect(activeStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
-        EasyMock.expect(activeStateManager.taskType()).andReturn(ACTIVE).anyTimes();
-        EasyMock.expect(standbyStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
-        EasyMock.expect(standbyStateManager.taskType()).andReturn(STANDBY).anyTimes();
-
         EasyMock.expect(storeMetadata.changelogPartition()).andReturn(tp).anyTimes();
         EasyMock.expect(storeMetadata.store()).andReturn(store).anyTimes();
         EasyMock.expect(store.name()).andReturn(storeName).anyTimes();
@@ -187,8 +192,6 @@
     @After
     public void tearDown() {
         EasyMock.reset(
-            activeStateManager,
-            standbyStateManager,
             storeMetadata,
             storeMetadataOne,
             storeMetadataTwo,
@@ -642,11 +645,12 @@
 
     @Test
     public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
+        setupActiveStateManager();
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
-        EasyMock.replay(mockTasks, activeStateManager, storeMetadata, store);
+        EasyMock.replay(mockTasks, storeMetadata, store);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
 
@@ -670,6 +674,7 @@
 
     @Test
     public void shouldRequestPositionAndHandleTimeoutException() {
+        setupActiveStateManager();
         final TaskId taskId = new TaskId(0, 0);
 
         final Task mockTask = mock(Task.class);
@@ -679,9 +684,9 @@
         mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
         EasyMock.expectLastCall();
         EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
-        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(mockTask, activeStateManager, storeMetadata, store);
+        when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L));
+        when(activeStateManager.taskId()).thenReturn(taskId);
+        EasyMock.replay(mockTask, storeMetadata, store);
 
         final AtomicBoolean clearException = new AtomicBoolean(false);
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@@ -724,10 +729,11 @@
 
     @Test
     public void shouldThrowIfPositionFail() {
+        setupActiveStateManager();
         final TaskId taskId = new TaskId(0, 0);
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId);
+        when(activeStateManager.taskId()).thenReturn(taskId);
         EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
-        EasyMock.replay(activeStateManager, storeMetadata, store);
+        EasyMock.replay(storeMetadata, store);
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
@@ -752,6 +758,7 @@
 
     @Test
     public void shouldRequestEndOffsetsAndHandleTimeoutException() {
+        setupActiveStateManager();
         final TaskId taskId = new TaskId(0, 0);
 
         final Task mockTask = niceMock(Task.class);
@@ -759,9 +766,9 @@
         EasyMock.expectLastCall();
 
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
-        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(mockTask, activeStateManager, storeMetadata, store);
+        when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
+        when(activeStateManager.taskId()).thenReturn(taskId);
+        EasyMock.replay(mockTask, storeMetadata, store);
 
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
 
@@ -813,11 +820,12 @@
 
     @Test
     public void shouldThrowIfEndOffsetsFail() {
+        setupActiveStateManager();
         final TaskId taskId = new TaskId(0, 0);
 
         EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(activeStateManager, storeMetadata, store);
+        when(activeStateManager.taskId()).thenReturn(taskId);
+        EasyMock.replay(storeMetadata, store);
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
@@ -963,12 +971,13 @@
 
     @Test
     public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
+        setupStandbyStateManager();
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
         EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
         EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
-        EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+        EasyMock.replay(mockTasks, storeMetadata, store);
 
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
         changelogReader.register(tp, standbyStateManager);
@@ -1004,14 +1013,15 @@
 
     @Test
     public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
+        setupStandbyStateManager();
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         //EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
         EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
         EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
-        EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes();
-        EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+        when(standbyStateManager.changelogAsSource(tp)).thenReturn(false);
+        EasyMock.replay(mockTasks, storeMetadata, store);
 
         final MockAdminClient adminClient = new MockAdminClient() {
             @Override
@@ -1061,13 +1071,14 @@
 
     @Test
     public void shouldRestoreToLimitInStandbyState() {
+        setupStandbyStateManager();
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
-        EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes();
+        when(standbyStateManager.changelogAsSource(tp)).thenReturn(true);
         EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
         EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
-        EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
+        EasyMock.replay(mockTasks, storeMetadata, store);
 
         final long now = time.milliseconds();
         final Properties properties = new Properties();
@@ -1176,6 +1187,7 @@
 
     @Test
     public void shouldRestoreMultipleChangelogs() {
+        setupActiveStateManager();
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
@@ -1186,14 +1198,14 @@
         EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
         EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes();
         EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
-        EasyMock.expect(activeStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
-        EasyMock.expect(activeStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
-        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(mkMap(
+        when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
+        when(activeStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
+        when(activeStateManager.changelogOffsets()).thenReturn(mkMap(
             mkEntry(tp, 5L),
             mkEntry(tp1, 5L),
             mkEntry(tp2, 5L)
-        )).anyTimes();
-        EasyMock.replay(mockTasks, activeStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
+        ));
+        EasyMock.replay(mockTasks, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
 
         setupConsumer(10, tp);
         setupConsumer(5, tp1);
@@ -1225,6 +1237,8 @@
 
     @Test
     public void shouldTransitState() {
+        setupActiveStateManager();
+        when(standbyStateManager.taskType()).thenReturn(STANDBY);
         final TaskId taskId = new TaskId(0, 0);
         EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
         EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
@@ -1233,12 +1247,12 @@
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
         EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes();
         EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
-        EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
-        EasyMock.expect(standbyStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
-        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.expect(standbyStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(activeStateManager, standbyStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
+        when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
+        when(standbyStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
+        when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
+        when(activeStateManager.taskId()).thenReturn(taskId);
+        when(standbyStateManager.taskId()).thenReturn(taskId);
+        EasyMock.replay(storeMetadata, store, storeMetadataOne, storeMetadataTwo);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
         adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L));
@@ -1304,11 +1318,12 @@
 
     @Test
     public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
+        when(standbyStateManager.taskType()).thenReturn(STANDBY);
         final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);
-        EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
+        when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
         EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
         EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
-        EasyMock.replay(standbyStateManager, store, storeMetadataOne);
+        EasyMock.replay(store, storeMetadataOne);
         changelogReader.register(tp1, standbyStateManager);
         changelogReader.transitToUpdateStandby();
 
@@ -1319,11 +1334,12 @@
 
     @Test
     public void shouldThrowIfRestoreCallbackThrows() {
+        setupActiveStateManager();
         final TaskId taskId = new TaskId(0, 0);
 
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
-        EasyMock.expect(activeStateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(activeStateManager, storeMetadata, store);
+        when(activeStateManager.taskId()).thenReturn(taskId);
+        EasyMock.replay(storeMetadata, store);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));