blob: 31460d4f5e6500361ac09603503b532019b824ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.StreamsTestUtils.TopologyMetadataBuilder.unnamedTopology;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class DefaultStateUpdaterTest {
private final static int COMMIT_INTERVAL = 100;
private final static long CALL_TIMEOUT = 1000;
private final static long VERIFICATION_TIMEOUT = 30000;
private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
private final static TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
private final static TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
private final static TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
private final static TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0);
private final static TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
private final static TaskId TASK_0_2 = new TaskId(0, 2);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final static TaskId TASK_1_1 = new TaskId(1, 1);
private final static TaskId TASK_A_0_0 = new TaskId(0, 0, "A");
private final static TaskId TASK_A_0_1 = new TaskId(0, 1, "A");
private final static TaskId TASK_B_0_0 = new TaskId(0, 0, "B");
private final static TaskId TASK_B_0_1 = new TaskId(0, 1, "B");
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
private final Metrics metrics = new Metrics(time);
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
private DefaultStateUpdater stateUpdater =
new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);
@AfterEach
public void tearDown() {
stateUpdater.shutdown(Duration.ofMinutes(1));
}
private Properties configProps(final int commitInterval) {
return mkObjectProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), commitInterval)
));
}
@Test
public void shouldShutdownStateUpdater() {
stateUpdater.start();
stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader).clear();
}
@Test
public void shouldShutdownStateUpdaterAndRestart() {
stateUpdater.start();
stateUpdater.shutdown(Duration.ofMinutes(1));
stateUpdater.start();
stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader, times(2)).clear();
}
@Test
public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
stateUpdater.add(statelessTask);
stateUpdater.add(statefulTask);
stateUpdater.remove(TASK_1_1);
stateUpdater.add(standbyTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
}
@Test
public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), null, changelogReader, topologyMetadata, time);
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask, standbyTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
verify(activeTask).maybeCheckpoint(true);
verify(standbyTask).maybeCheckpoint(true);
}
@Test
public void shouldRemovePausedTasksOnShutdown() throws Exception {
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask, standbyTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(activeTask, standbyTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
}
@Test
public void shouldRemovePausedAndUpdatingTasksOnShutdown() throws Exception {
final StreamTask activeTask = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
when(topologyMetadata.isPaused(standbyTask.id().topologyName())).thenReturn(false).thenReturn(true);
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyPausedTasks(standbyTask);
verifyUpdatingTasks(activeTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
}
@Test
public void shouldThrowIfStatelessTaskNotInStateRestoring() {
shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build());
}
@Test
public void shouldThrowIfStatefulTaskNotInStateRestoring() {
shouldThrowIfActiveTaskNotInStateRestoring(statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build());
}
private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) {
shouldThrowIfTaskNotInGivenState(task, State.RESTORING);
}
@Test
public void shouldThrowIfStandbyTaskNotInStateRunning() {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).build();
shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
}
private void shouldThrowIfTaskNotInGivenState(final Task task, final State correctState) {
for (final State state : State.values()) {
if (state != correctState) {
when(task.state()).thenReturn(state);
assertThrows(IllegalStateException.class, () -> stateUpdater.add(task));
}
}
}
@Test
public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task2, task1);
}
private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task task2) throws Exception {
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyFailedTasks(IllegalStateException.class, task1);
}
@Test
public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
}
@Test
public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask task2 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
}
private void shouldImmediatelyAddStatelessTasksToRestoredTasks(final StreamTask... tasks) throws Exception {
stateUpdater.start();
for (final StreamTask task : tasks) {
stateUpdater.add(task);
}
verifyRestoredActiveTasks(tasks);
verifyNeverCheckpointTasks(tasks);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
}
@Test
public void shouldRestoreSingleActiveStatefulTask() throws Exception {
final StreamTask task =
statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
.thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
stateUpdater.start();
stateUpdater.add(task);
verifyRestoredActiveTasks(task);
verifyCheckpointTasks(true, task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
verify(changelogReader).unregister(task.changelogPartitions());
verify(changelogReader).enforceRestoreActive();
verify(changelogReader, atLeast(3)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_C_0))
.thenReturn(mkSet(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0))
.thenReturn(mkSet(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(false)
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
verifyRestoredActiveTasks(task3, task1, task2);
verifyCheckpointTasks(true, task3, task1, task2);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
verify(changelogReader).register(task3.changelogPartitions(), task3.stateManager());
verify(changelogReader).unregister(task1.changelogPartitions());
verify(changelogReader).unregister(task2.changelogPartitions());
verify(changelogReader).unregister(task3.changelogPartitions());
verify(changelogReader, times(3)).enforceRestoreActive();
verify(changelogReader, atLeast(4)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskAdded() {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
stateUpdater.add(task);
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskUpdating() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyRestoredActiveTasks();
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskRestored() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(true);
stateUpdater.start();
stateUpdater.add(task);
verifyRestoredActiveTasks(task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskRemoved() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.remove(task.id());
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks(task);
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskFailed() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(task.id()));
doThrow(taskCorruptedException).when(changelogReader).restore(mkMap(mkEntry(TASK_0_0, task)));
stateUpdater.start();
stateUpdater.add(task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, task));
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskPaused() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks(task);
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnFalseForRestoreActiveTasksIfTaskRemovedFromStateUpdater() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(true);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(VERIFICATION_TIMEOUT));
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertFalse(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RUNNING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyRestoredActiveTasks();
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertFalse(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldDrainRestoredActiveTasks() throws Exception {
assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
stateUpdater.start();
stateUpdater.add(task1);
verifyDrainingRestoredActiveTasks(task1);
final StreamTask task2 = statelessTask(TASK_1_1).inState(State.RESTORING).build();
final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
final StreamTask task4 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
verifyDrainingRestoredActiveTasks(task2, task3, task4);
}
@Test
public void shouldUpdateSingleStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task);
}
@Test
public void shouldUpdateMultipleStandbyTasks() throws Exception {
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task1, task2, task3);
}
private void shouldUpdateStandbyTasks(final StandbyTask... tasks) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
for (final StandbyTask task : tasks) {
stateUpdater.add(task);
}
verifyUpdatingStandbyTasks(tasks);
verifyRestoredActiveTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
for (final StandbyTask task : tasks) {
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
}
verify(changelogReader).transitToUpdateStandby();
verify(changelogReader, timeout(VERIFICATION_TIMEOUT).atLeast(1)).restore(anyMap());
verify(changelogReader, never()).enforceRestoreActive();
}
@Test
public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
.thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
verifyRestoredActiveTasks(task2, task1);
verifyCheckpointTasks(true, task2, task1);
verifyUpdatingStandbyTasks(task4, task3);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
verify(changelogReader).register(task3.changelogPartitions(), task3.stateManager());
verify(changelogReader).register(task4.changelogPartitions(), task4.stateManager());
verify(changelogReader, atLeast(3)).restore(anyMap());
final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
.thenReturn(mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyRestoredActiveTasks(task1);
verifyCheckpointTasks(true, task1);
verifyUpdatingStandbyTasks(task2);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
stateUpdater.add(task3);
verifyRestoredActiveTasks(task1, task3);
verifyCheckpointTasks(true, task3);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
final Map<TaskId, Task> updatingTasks1 = mkMap(
mkEntry(activeTask1.id(), activeTask1),
mkEntry(activeTask2.id(), activeTask2),
mkEntry(standbyTask.id(), standbyTask)
);
doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks1);
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask1);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask);
final ExceptionAndTask expectedExceptionAndTask1 = new ExceptionAndTask(taskCorruptedException, activeTask1);
final ExceptionAndTask expectedExceptionAndTask2 = new ExceptionAndTask(taskCorruptedException, activeTask2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldNotTransitToStandbyAgainAfterStandbyTaskFailed() throws Exception {
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(task1.id()));
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(taskCorruptedException, task1);
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask1);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask1, activeTask2, standbyTask);
stateUpdater.remove(activeTask1.id());
stateUpdater.remove(activeTask2.id());
verifyRemovedTasks(activeTask1, activeTask2);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldNotSwitchTwiceToUpdatingStandbyTaskIfStandbyTaskIsRemoved() throws Exception {
final StandbyTask standbyTask1 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
verifyUpdatingTasks(standbyTask1, standbyTask2);
stateUpdater.remove(standbyTask2.id());
verifyRemovedTasks(standbyTask2);
verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldRemoveActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldRemoveStatefulTask(task);
}
@Test
public void shouldRemoveStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldRemoveStatefulTask(task);
}
private void shouldRemoveStatefulTask(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.remove(task.id());
verifyRemovedTasks(task);
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task.changelogPartitions());
}
@Test
public void shouldRemovePausedTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyUpdatingTasks(task1, task2);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task1, task2);
verifyRemovedTasks();
verifyUpdatingTasks();
stateUpdater.remove(task1.id());
stateUpdater.remove(task2.id());
verifyRemovedTasks(task1, task2);
verifyPausedTasks();
verifyCheckpointTasks(true, task1, task2);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task1.changelogPartitions());
verify(changelogReader).unregister(task2.changelogPartitions());
}
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
@Test
public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
verifyRestoredActiveTasks(task);
stateUpdater.remove(task.id());
stateUpdater.remove(controlTask.id());
verifyRemovedTasks(controlTask);
verifyRestoredActiveTasks(task);
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
@Test
public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
);
doThrow(streamsException)
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(streamsException, task);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
stateUpdater.remove(task.id());
stateUpdater.remove(controlTask.id());
verifyRemovedTasks(controlTask);
verifyPausedTasks();
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
@Test
public void shouldPauseActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldPauseStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
final StreamTask task1 = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
when(topologyMetadata.isPaused(task1.id().topologyName())).thenReturn(false).thenReturn(true);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyPausedTasks(task1);
verifyCheckpointTasks(true, task1);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks(task2);
verifyExceptionsAndFailedTasks();
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldPauseStandbyTaskAndNotTransitToRestoreActive() throws Exception {
final StandbyTask task1 = standbyTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
when(topologyMetadata.isPaused(task1.id().topologyName())).thenReturn(false).thenReturn(true);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyPausedTasks(task1);
verifyUpdatingTasks(task2);
verifyCheckpointTasks(true, task1);
verify(changelogReader, never()).enforceRestoreActive();
}
private void shouldPauseStatefulTask(final Task task) throws Exception {
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task);
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotPausingNonExistTasks() throws Exception {
stateUpdater.start();
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
verifyRestoredActiveTasks(task);
verifyUpdatingTasks(controlTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(controlTask);
verifyRestoredActiveTasks(task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInFailedTasks(task);
}
private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
);
doThrow(streamsException)
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(streamsException, task);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks(controlTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(controlTask);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
@Test
public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.remove(task.id());
verifyRemovedTasks(task);
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyRemovedTasks(task);
verifyUpdatingTasks();
verifyPausedTasks();
}
@Test
public void shouldResumeActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).enforceRestoreActive();
}
@Test
public void shouldIdleWhenAllTasksPaused() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
stateUpdater.start();
stateUpdater.add(task);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task);
verifyIdle();
when(topologyMetadata.isPaused(null)).thenReturn(false);
stateUpdater.signalResume();
verifyPausedTasks();
verifyUpdatingTasks(task);
}
@Test
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).transitToUpdateStandby();
}
private void shouldResumeStatefulTask(final Task task) throws Exception {
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task);
verifyUpdatingTasks();
when(topologyMetadata.isPaused(null)).thenReturn(false);
stateUpdater.signalResume();
verifyPausedTasks();
verifyUpdatingTasks(task);
}
@Test
public void shouldNotResumeNonExistingTasks() throws Exception {
stateUpdater.start();
verifyPausedTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
verifyRestoredActiveTasks(task);
verifyPausedTasks();
verifyRestoredActiveTasks(task);
verifyUpdatingTasks(controlTask);
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInRemovedTasks(task);
}
private void shouldNotResumeTaskInRemovedTasks(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
stateUpdater.remove(task.id());
verifyRemovedTasks(task);
verifyUpdatingTasks();
verifyUpdatingTasks();
}
@Test
public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInFailedTasks(task);
}
private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
);
doThrow(streamsException)
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(streamsException, task);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks(controlTask);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyUpdatingTasks(controlTask);
}
@Test
public void shouldDrainRemovedTasks() throws Exception {
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
stateUpdater.add(task1);
stateUpdater.remove(task1.id());
verifyDrainingRemovedTasks(task1);
final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.remove(task2.id());
stateUpdater.add(task3);
stateUpdater.remove(task3.id());
stateUpdater.add(task4);
stateUpdater.remove(task4.id());
verifyDrainingRemovedTasks(task2, task3, task4);
}
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException = new StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
doReturn(0L).doThrow(streamsException).when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
final ExceptionAndTask expectedExceptionAndTask1 = new ExceptionAndTask(streamsException, task1);
final ExceptionAndTask expectedExceptionAndTask2 = new ExceptionAndTask(streamsException, task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyRemovedTasks();
verifyPausedTasks();
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id());
final Map<TaskId, Task> updatingTasksBeforeFirstThrow = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2),
mkEntry(task3.id(), task3)
);
final Map<TaskId, Task> updatingTasksBeforeSecondThrow = mkMap(
mkEntry(task2.id(), task2),
mkEntry(task3.id(), task3)
);
doReturn(0L)
.doThrow(streamsException1)
.when(changelogReader).restore(updatingTasksBeforeFirstThrow);
doReturn(0L)
.doThrow(streamsException2)
.when(changelogReader).restore(updatingTasksBeforeSecondThrow);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
final ExceptionAndTask expectedExceptionAndTasks1 = new ExceptionAndTask(streamsException1, task1);
final ExceptionAndTask expectedExceptionAndTasks2 = new ExceptionAndTask(streamsException2, task3);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
verifyUpdatingTasks(task2);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyPausedTasks();
}
@Test
public void shouldHandleTaskCorruptedExceptionAndAddFailedTasksToQueue() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id());
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2),
mkEntry(task3.id(), task3)
);
doReturn(0L).doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
final ExceptionAndTask expectedExceptionAndTask1 = new ExceptionAndTask(taskCorruptedException, task1);
final ExceptionAndTask expectedExceptionAndTask2 = new ExceptionAndTask(taskCorruptedException, task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyUpdatingTasks(task3);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verify(changelogReader).unregister(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
verify(task1).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_A_0));
verify(task2).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_B_0));
}
@Test
public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
doThrow(illegalStateException).when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
final ExceptionAndTask expectedExceptionAndTask1 = new ExceptionAndTask(illegalStateException, task1);
final ExceptionAndTask expectedExceptionAndTask2 = new ExceptionAndTask(illegalStateException, task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyUpdatingTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyPausedTasks();
}
@Test
public void shouldDrainFailedTasksAndExceptions() throws Exception {
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final Map<TaskId, Task> updatingTasks1 = mkMap(
mkEntry(task1.id(), task1)
);
doThrow(streamsException1)
.when(changelogReader).restore(updatingTasks1);
final StreamsException streamsException2 = new StreamsException(exceptionMessage, task2.id());
final StreamsException streamsException3 = new StreamsException(exceptionMessage, task3.id());
final StreamsException streamsException4 = new StreamsException(exceptionMessage, task4.id());
final Map<TaskId, Task> updatingTasks2 = mkMap(
mkEntry(task2.id(), task2),
mkEntry(task3.id(), task3),
mkEntry(task4.id(), task4)
);
doThrow(streamsException2).when(changelogReader).restore(updatingTasks2);
final Map<TaskId, Task> updatingTasks3 = mkMap(
mkEntry(task3.id(), task3),
mkEntry(task4.id(), task4)
);
doThrow(streamsException3).when(changelogReader).restore(updatingTasks3);
final Map<TaskId, Task> updatingTasks4 = mkMap(
mkEntry(task4.id(), task4)
);
doThrow(streamsException4).when(changelogReader).restore(updatingTasks4);
stateUpdater.start();
stateUpdater.add(task1);
final ExceptionAndTask expectedExceptionAndTasks1 = new ExceptionAndTask(streamsException1, task1);
verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks1);
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
final ExceptionAndTask expectedExceptionAndTasks2 = new ExceptionAndTask(streamsException2, task2);
final ExceptionAndTask expectedExceptionAndTasks3 = new ExceptionAndTask(streamsException3, task3);
final ExceptionAndTask expectedExceptionAndTasks4 = new ExceptionAndTask(streamsException4, task4);
verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
}
@Test
public void shouldAutoCheckpointTasksOnInterval() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
// wait for all tasks added to the thread before advance timer
verifyUpdatingTasks(task1, task2, task3, task4);
time.sleep(COMMIT_INTERVAL + 1);
verifyExceptionsAndFailedTasks();
verifyCheckpointTasks(false, task1, task2, task3, task4);
}
@Test
public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
// we need to use a non auto-ticking timer here to control how much time elapsed exactly
final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);
try {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
time.sleep(COMMIT_INTERVAL);
verifyNeverCheckpointTasks(task1, task2, task3, task4);
} finally {
stateUpdater.shutdown(Duration.ofMinutes(1));
}
}
private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) {
for (final Task task : tasks) {
verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
}
}
private void verifyNeverCheckpointTasks(final Task... tasks) {
for (final Task task : tasks) {
verify(task, never()).maybeCheckpoint(anyBoolean());
}
}
@Test
public void shouldGetTasksFromInputQueue() {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
stateUpdater.remove(TASK_0_0);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask3);
verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet(standbyTask1, standbyTask2, standbyTask3));
}
@Test
public void shouldGetTasksFromUpdatingTasks() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask3);
verifyUpdatingTasks(activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3);
verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet(standbyTask1, standbyTask2, standbyTask3));
}
@Test
public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask1);
stateUpdater.add(activeTask2);
verifyRestoredActiveTasks(activeTask1, activeTask2);
verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet());
stateUpdater.drainRestoredActiveTasks(Duration.ofMinutes(1));
verifyGetTasks(mkSet(), mkSet());
}
@Test
public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id()));
final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
final Map<TaskId, Task> updatingTasks1 = mkMap(
mkEntry(activeTask1.id(), activeTask1),
mkEntry(standbyTask1.id(), standbyTask1),
mkEntry(standbyTask2.id(), standbyTask2)
);
doReturn(0L).doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks1);
final Map<TaskId, Task> updatingTasks2 = mkMap(
mkEntry(activeTask1.id(), activeTask1)
);
doReturn(0L).doThrow(streamsException).doReturn(0L).when(changelogReader).restore(updatingTasks2);
stateUpdater.start();
stateUpdater.add(standbyTask1);
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask2);
final ExceptionAndTask expectedExceptionAndTasks1 = new ExceptionAndTask(taskCorruptedException, standbyTask1);
final ExceptionAndTask expectedExceptionAndTasks2 = new ExceptionAndTask(taskCorruptedException, standbyTask2);
final ExceptionAndTask expectedExceptionAndTasks3 = new ExceptionAndTask(streamsException, activeTask1);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2, expectedExceptionAndTasks3);
verifyGetTasks(mkSet(activeTask1), mkSet(standbyTask1, standbyTask2));
stateUpdater.drainExceptionsAndFailedTasks();
verifyGetTasks(mkSet(), mkSet());
}
@Test
public void shouldGetTasksFromRemovedTasks() throws Exception {
final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(standbyTask1);
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask2);
stateUpdater.remove(standbyTask1.id());
stateUpdater.remove(standbyTask2.id());
stateUpdater.remove(activeTask.id());
verifyRemovedTasks(activeTask, standbyTask1, standbyTask2);
verifyGetTasks(mkSet(activeTask), mkSet(standbyTask1, standbyTask2));
stateUpdater.drainRemovedTasks();
verifyGetTasks(mkSet(), mkSet());
}
@Test
public void shouldGetTasksFromPausedTasks() throws Exception {
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask, standbyTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(activeTask, standbyTask);
verifyGetTasks(mkSet(activeTask), mkSet(standbyTask));
}
@Test
public void shouldRecordMetrics() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask3 = standbyTask(TASK_A_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
final StandbyTask standbyTask4 = standbyTask(TASK_B_0_1, mkSet(TOPIC_PARTITION_B_1)).inState(State.RUNNING).build();
final Map<TaskId, Task> tasks1234 = mkMap(
mkEntry(activeTask1.id(), activeTask1),
mkEntry(activeTask2.id(), activeTask2),
mkEntry(standbyTask3.id(), standbyTask3),
mkEntry(standbyTask4.id(), standbyTask4)
);
final Map<TaskId, Task> tasks13 = mkMap(
mkEntry(activeTask1.id(), activeTask1),
mkEntry(standbyTask3.id(), standbyTask3)
);
when(topologyMetadata.isPaused(activeTask2.id().topologyName())).thenReturn(true);
when(topologyMetadata.isPaused(standbyTask4.id().topologyName())).thenReturn(true);
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
when(changelogReader.restore(tasks1234)).thenReturn(1L);
when(changelogReader.restore(tasks13)).thenReturn(1L);
when(changelogReader.isRestoringActive()).thenReturn(true);
stateUpdater.start();
stateUpdater.add(activeTask1);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask3);
stateUpdater.add(standbyTask4);
verifyPausedTasks(activeTask2, standbyTask4);
assertThat(metrics.metrics().size(), is(11));
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put("thread-id", "test-state-updater");
MetricName metricName = new MetricName("active-restoring-tasks",
"stream-state-updater-metrics",
"The number of active tasks currently undergoing restoration",
tagMap);
verifyMetric(metrics, metricName, is(1.0));
metricName = new MetricName("standby-updating-tasks",
"stream-state-updater-metrics",
"The number of standby tasks currently undergoing state update",
tagMap);
verifyMetric(metrics, metricName, is(1.0));
metricName = new MetricName("active-paused-tasks",
"stream-state-updater-metrics",
"The number of active tasks paused restoring",
tagMap);
verifyMetric(metrics, metricName, is(1.0));
metricName = new MetricName("standby-paused-tasks",
"stream-state-updater-metrics",
"The number of standby tasks paused state update",
tagMap);
verifyMetric(metrics, metricName, is(1.0));
metricName = new MetricName("idle-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on being idle",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
metricName = new MetricName("active-restore-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on restoring active tasks",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
metricName = new MetricName("standby-update-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on updating standby tasks",
tagMap);
verifyMetric(metrics, metricName, is(0.0d));
metricName = new MetricName("checkpoint-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on checkpointing tasks restored progress",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
metricName = new MetricName("restore-records-rate",
"stream-state-updater-metrics",
"The average per-second number of records restored",
tagMap);
verifyMetric(metrics, metricName, not(0.0d));
metricName = new MetricName("restore-call-rate",
"stream-state-updater-metrics",
"The average per-second number of restore calls triggered",
tagMap);
verifyMetric(metrics, metricName, not(0.0d));
stateUpdater.shutdown(Duration.ofMinutes(1));
assertThat(metrics.metrics().size(), is(1));
}
@SuppressWarnings("unchecked")
private static <T> void verifyMetric(final Metrics metrics,
final MetricName metricName,
final Matcher<T> matcher) {
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));
assertThat(metrics.metrics().get(metricName).metricName().tags(), is(metricName.tags()));
assertThat((T) metrics.metrics().get(metricName).metricValue(), matcher);
}
private void verifyGetTasks(final Set<StreamTask> expectedActiveTasks,
final Set<StandbyTask> expectedStandbyTasks) {
final Set<Task> tasks = stateUpdater.getTasks();
assertEquals(expectedActiveTasks.size() + expectedStandbyTasks.size(), tasks.size());
tasks.forEach(task -> assertInstanceOf(ReadOnlyTask.class, task));
final Set<TaskId> actualTaskIds = tasks.stream().map(Task::id).collect(Collectors.toSet());
final Set<Task> expectedTasks = new HashSet<>(expectedActiveTasks);
expectedTasks.addAll(expectedStandbyTasks);
final Set<TaskId> expectedTaskIds = expectedTasks.stream().map(Task::id).collect(Collectors.toSet());
assertTrue(actualTaskIds.containsAll(expectedTaskIds));
final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
assertEquals(expectedActiveTasks.size(), activeTasks.size());
assertTrue(activeTasks.containsAll(expectedActiveTasks));
final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks();
assertEquals(expectedStandbyTasks.size(), standbyTasks.size());
assertTrue(standbyTasks.containsAll(expectedStandbyTasks));
}
private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getRestoredActiveTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty restored active task within the given timeout!"
);
} else {
final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
final Set<StreamTask> restoredTasks = new HashSet<>();
waitForCondition(
() -> {
restoredTasks.addAll(stateUpdater.getRestoredActiveTasks());
return restoredTasks.containsAll(expectedRestoredTasks)
&& restoredTasks.size() == expectedRestoredTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!"
);
}
}
private void verifyDrainingRestoredActiveTasks(final StreamTask... tasks) throws Exception {
final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
final Set<StreamTask> restoredTasks = new HashSet<>();
waitForCondition(
() -> {
restoredTasks.addAll(stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
return restoredTasks.containsAll(expectedRestoredTasks)
&& restoredTasks.size() == expectedRestoredTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!"
);
assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
}
private void verifyUpdatingTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getUpdatingTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty updating task within the given timeout!"
);
} else {
final Set<Task> expectedUpdatingTasks = mkSet(tasks);
final Set<Task> updatingTasks = new HashSet<>();
waitForCondition(
() -> {
updatingTasks.addAll(stateUpdater.getUpdatingTasks());
return updatingTasks.containsAll(expectedUpdatingTasks)
&& updatingTasks.size() == expectedUpdatingTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all updating task within the given timeout!"
);
}
}
private void verifyUpdatingStandbyTasks(final StandbyTask... tasks) throws Exception {
final Set<StandbyTask> expectedStandbyTasks = mkSet(tasks);
final Set<StandbyTask> standbyTasks = new HashSet<>();
waitForCondition(
() -> {
standbyTasks.addAll(stateUpdater.getUpdatingStandbyTasks());
return standbyTasks.containsAll(expectedStandbyTasks)
&& standbyTasks.size() == expectedStandbyTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not see all standby task within the given timeout!"
);
}
private void verifyRemovedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getRemovedTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty removed task within the given timeout!"
);
} else {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
waitForCondition(
() -> {
removedTasks.addAll(stateUpdater.getRemovedTasks());
return removedTasks.containsAll(expectedRemovedTasks)
&& removedTasks.size() == expectedRemovedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all removed task within the given timeout!"
);
}
}
private void verifyIdle() throws Exception {
waitForCondition(
() -> stateUpdater.isIdle(),
VERIFICATION_TIMEOUT,
"State updater did not enter an idling state!"
);
}
private void verifyPausedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getPausedTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty paused task within the given timeout!"
);
} else {
final Set<Task> expectedPausedTasks = mkSet(tasks);
final Set<Task> pausedTasks = new HashSet<>();
waitForCondition(
() -> {
pausedTasks.addAll(stateUpdater.getPausedTasks());
return pausedTasks.containsAll(expectedPausedTasks)
&& pausedTasks.size() == expectedPausedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all paused task within the given timeout!"
);
}
}
private void verifyDrainingRemovedTasks(final Task... tasks) throws Exception {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
waitForCondition(
() -> {
if (stateUpdater.hasRemovedTasks()) {
final Set<Task> drainedTasks = stateUpdater.drainRemovedTasks();
assertFalse(drainedTasks.isEmpty());
removedTasks.addAll(drainedTasks);
}
return removedTasks.containsAll(mkSet(tasks))
&& removedTasks.size() == expectedRemovedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!"
);
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
}
private void verifyExceptionsAndFailedTasks(final ExceptionAndTask... exceptionsAndTasks) throws Exception {
final List<ExceptionAndTask> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
final Set<ExceptionAndTask> failedTasks = new HashSet<>();
waitForCondition(
() -> {
failedTasks.addAll(stateUpdater.getExceptionsAndFailedTasks());
return failedTasks.containsAll(expectedExceptionAndTasks)
&& failedTasks.size() == expectedExceptionAndTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all exceptions and failed tasks within the given timeout!"
);
}
private void verifyFailedTasks(final Class<? extends RuntimeException> clazz, final Task... tasks) throws Exception {
final List<Task> expectedFailedTasks = Arrays.asList(tasks);
final Set<Task> failedTasks = new HashSet<>();
waitForCondition(
() -> {
for (final ExceptionAndTask exceptionAndTask : stateUpdater.getExceptionsAndFailedTasks()) {
if (clazz.isInstance(exceptionAndTask.exception())) {
failedTasks.add(exceptionAndTask.task());
}
}
return failedTasks.containsAll(expectedFailedTasks)
&& failedTasks.size() == expectedFailedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all exceptions and failed tasks within the given timeout!"
);
}
private void verifyDrainingExceptionsAndFailedTasks(final ExceptionAndTask... exceptionsAndTasks) throws Exception {
final List<ExceptionAndTask> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
final List<ExceptionAndTask> failedTasks = new ArrayList<>();
waitForCondition(
() -> {
if (stateUpdater.hasExceptionsAndFailedTasks()) {
final List<ExceptionAndTask> exceptionAndTasks = stateUpdater.drainExceptionsAndFailedTasks();
assertFalse(exceptionAndTasks.isEmpty());
failedTasks.addAll(exceptionAndTasks);
}
return failedTasks.containsAll(expectedExceptionAndTasks)
&& failedTasks.size() == expectedExceptionAndTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all exceptions and failed tasks within the given timeout!"
);
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
}
}