blob: 566e83826bbd48e1cddac1072f9e906d9140a545 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TaskManagerTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final TaskId taskId00 = new TaskId(0, 0);
private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
private final TopicPartition t1p0changelog = new TopicPartition("changelog", 0);
private final Set<TopicPartition> taskId00Partitions = mkSet(t1p0);
private final Set<TopicPartition> taskId00ChangelogPartitions = mkSet(t1p0changelog);
private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = singletonMap(taskId00, taskId00Partitions);
private final TaskId taskId01 = new TaskId(0, 1);
private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t2p2 = new TopicPartition(topic2, 1);
private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1);
private final Set<TopicPartition> taskId01Partitions = mkSet(t1p1);
private final Set<TopicPartition> taskId01ChangelogPartitions = mkSet(t1p1changelog);
private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = singletonMap(taskId01, taskId01Partitions);
private final TaskId taskId02 = new TaskId(0, 2);
private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
private final TopicPartition t1p2changelog = new TopicPartition("changelog", 2);
private final Set<TopicPartition> taskId02Partitions = mkSet(t1p2);
private final Set<TopicPartition> taskId02ChangelogPartitions = mkSet(t1p2changelog);
private final TaskId taskId03 = new TaskId(0, 3);
private final TopicPartition t1p3 = new TopicPartition(topic1, 3);
private final TopicPartition t1p3changelog = new TopicPartition("changelog", 3);
private final Set<TopicPartition> taskId03Partitions = mkSet(t1p3);
private final Set<TopicPartition> taskId03ChangelogPartitions = mkSet(t1p3changelog);
private final TaskId taskId04 = new TaskId(0, 4);
private final TopicPartition t1p4 = new TopicPartition(topic1, 4);
private final TopicPartition t1p4changelog = new TopicPartition("changelog", 4);
private final Set<TopicPartition> taskId04Partitions = mkSet(t1p4);
private final Set<TopicPartition> taskId04ChangelogPartitions = mkSet(t1p4changelog);
private final TaskId taskId05 = new TaskId(0, 5);
private final TopicPartition t1p5 = new TopicPartition(topic1, 5);
private final Set<TopicPartition> taskId05Partitions = mkSet(t1p5);
private final TaskId taskId10 = new TaskId(1, 0);
private final TopicPartition t2p0 = new TopicPartition(topic2, 0);
private final Set<TopicPartition> taskId10Partitions = mkSet(t2p0);
private final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = partitions -> { };
@Mock
private InternalTopologyBuilder topologyBuilder;
@Mock
private StateDirectory stateDirectory;
@Mock
private ChangelogReader changeLogReader;
@Mock
private Consumer<byte[], byte[]> consumer;
@Mock
private ActiveTaskCreator activeTaskCreator;
@Mock
private StandbyTaskCreator standbyTaskCreator;
@Mock
private Admin adminClient;
@Mock
private ProcessorStateManager stateManager;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ProcessorStateManager.StateStoreMetadata stateStore;
final StateUpdater stateUpdater = mock(StateUpdater.class);
final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class);
private TaskManager taskManager;
private TopologyMetadata topologyMetadata;
private final Time time = new MockTime();
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@Rule
public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Before
public void setUp() {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) {
return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks, final boolean stateUpdaterEnabled) {
return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode));
final TaskManager taskManager = new TaskManager(
time,
changeLogReader,
UUID.randomUUID(),
"taskManagerTest",
activeTaskCreator,
standbyTaskCreator,
tasks != null ? tasks : new Tasks(new LogContext()),
topologyMetadata,
adminClient,
stateDirectory,
stateUpdaterEnabled ? stateUpdater : null,
processingThreadsEnabled ? schedulingTaskManager : null
);
taskManager.setMainConsumer(consumer);
return taskManager;
}
@Test
public void shouldClassifyExistingTasksWithoutStateUpdater() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, false);
final Map<TaskId, Set<TopicPartition>> runningActiveTasks = mkMap(mkEntry(taskId01, mkSet(t1p1)));
final Map<TaskId, Set<TopicPartition>> standbyTasks = mkMap(mkEntry(taskId02, mkSet(t2p2)));
final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = mkMap(mkEntry(taskId03, mkSet(t1p3)));
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(runningActiveTasks);
activeTasks.putAll(restoringActiveTasks);
handleAssignment(runningActiveTasks, standbyTasks, restoringActiveTasks);
taskManager.handleAssignment(activeTasks, standbyTasks);
verifyNoInteractions(stateUpdater);
}
@Test
public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater() {
final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId03Partitions);
verify(standbyTask, never()).updateInputPartitions(eq(taskId03Partitions), any());
}
@Test
public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater() {
final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId04Partitions);
verify(standbyTask).updateInputPartitions(eq(taskId04Partitions), any());
}
private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task standbyTask,
final Set<TopicPartition> newInputPartition) {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(standbyTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTask.id(), newInputPartition))
);
verify(standbyTask).resume();
}
@Test
public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.activeTaskIds()).thenReturn(mkSet(taskId00, taskId01));
when(tasks.task(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.handleCorruption(mkSet(taskId00));
verify(consumer).assignment();
verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.commit(mkSet(activeTask1, activeTask2));
verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTaskIds()).thenReturn(mkSet(taskId00, taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.handleAssignment(
mkMap(mkEntry(taskId00, taskId00Partitions)),
mkMap(mkEntry(taskId01, taskId01Partitions))
);
verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
public void shouldLockAffectedTasksOnHandleRevocation() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.handleRevocation(taskId01Partitions);
verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
public void shouldLockTasksOnClose() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.closeAndCleanUpTasks(mkSet(activeTask1), mkSet(), false);
verify(schedulingTaskManager).lockTasks(mkSet(taskId00));
verify(schedulingTaskManager).unlockTasks(mkSet(taskId00));
}
@Test
public void shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
}
@Test
public void shouldUpdateLagForAllActiveTasks() {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
taskManager.updateLags();
verify(activeTask1).updateLags();
verify(activeTask2).updateLags();
}
@Test
public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions()))
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldPrepareStandbyTaskInStateUpdaterToBeRecycled() {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToRecycle));
taskManager.handleAssignment(
mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater).remove(standbyTaskToRecycle.id());
verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldRemoveUnusedActiveTaskFromStateUpdater() {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater).remove(activeTaskToClose.id());
verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldRemoveUnusedStandbyTaskFromStateUpdater() {
final StandbyTask standbyTaskToClose = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater).remove(standbyTaskToClose.id());
verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() {
final StreamTask activeTaskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), newInputPartitions)),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldCloseReviveAndUpdateInputPartitionOfActiveTaskInStateUpdater() {
final StreamTask activeTaskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id())).thenReturn(true);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), newInputPartitions)),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id());
verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id());
verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldKeepReassignedActiveTaskInStateUpdater() {
final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(), reassignedActiveTask.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldMoveReassignedSuspendedActiveTaskToStateUpdater() {
final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(), reassignedActiveTask.inputPartitions())),
Collections.emptyMap()
);
verify(tasks).removeTask(reassignedActiveTask);
verify(stateUpdater).add(reassignedActiveTask);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldRemoveReassignedRevokedActiveTaskInStateUpdaterFromPendingTaskToSuspend() {
final StreamTask reassignedRevokedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedRevokedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedRevokedActiveTask.id(), reassignedRevokedActiveTask.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(tasks).removePendingActiveTaskToSuspend(reassignedRevokedActiveTask.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldRemoveReassignedLostTaskInStateUpdaterFromPendingTaskToCloseClean() {
final StreamTask reassignedLostTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedLostTask));
when(tasks.removePendingTaskToCloseClean(reassignedLostTask.id())).thenReturn(true);
taskManager.handleAssignment(
mkMap(mkEntry(reassignedLostTask.id(), reassignedLostTask.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id());
verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldRemoveReassignedTaskInStateUpdaterFromPendingSuspend() {
final StreamTask reassignedTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedTask));
when(tasks.removePendingActiveTaskToSuspend(reassignedTask.id())).thenReturn(true);
taskManager.handleAssignment(
mkMap(mkEntry(reassignedTask.id(), reassignedTask.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldReAddStandbyTaskFromPendingRecycle() {
final StandbyTask reassignedStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
when(tasks.removePendingTaskToRecycle(reassignedStandbyTask.id())).thenReturn(taskId01Partitions);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
);
verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
verify(tasks, never())
.addPendingTaskToUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldNeverCloseReviveAndUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id());
verify(tasks, never())
.addPendingTaskToCloseReviveAndUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldKeepReassignedStandbyTaskInStateUpdater() {
final StandbyTask reassignedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldAssignMultipleTasksInStateUpdater() {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose, standbyTaskToRecycle));
taskManager.handleAssignment(
mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(stateUpdater).remove(activeTaskToClose.id());
verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
verify(stateUpdater).remove(standbyTaskToRecycle.id());
verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldReturnStateUpdaterTasksInAllTasks() {
final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
assertEquals(taskManager.allTasks(), mkMap(mkEntry(taskId03, activeTask), mkEntry(taskId02, standbyTask)));
}
@Test
public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
}
@Test
public void shouldCreateActiveTaskDuringAssignment() {
final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()));
when(activeTaskCreator.createTasks(consumer, tasksToBeCreated)).thenReturn(createdTasks);
taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
verify(tasks).addPendingTasksToInit(createdTasks);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldCreateStandbyTaskDuringAssignment() {
final StandbyTask standbyTaskToBeCreated = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(standbyTaskToBeCreated);
when(standbyTaskCreator.createTasks(mkMap(
mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions())))
).thenReturn(createdTasks);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions()))
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(tasks).addPendingTasksToInit(createdTasks);
}
@Test
public void shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
verify(tasks).removeTask(activeTaskToRecycle);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).replaceActiveWithStandby(standbyTask);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
() -> taskManager.handleAssignment(
mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions())),
Collections.emptyMap()
)
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId03 + " is managed by the stream thread");
verifyNoInteractions(activeTaskCreator);
}
@Test
public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled() {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).closeClean();
verify(tasks).removeTask(activeTaskToClose);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled() {
final StandbyTask standbyTaskToClose = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
() -> taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap())
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId03 + " is managed by the stream thread");
verifyNoInteractions(activeTaskCreator);
}
@Test
public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled() {
final StreamTask activeTaskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), newInputPartitions)),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(activeTaskToUpdateInputPartitions).updateInputPartitions(eq(newInputPartitions), any());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() {
final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(), activeTaskToResume.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() {
final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(), activeTaskToResume.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(activeTaskToResume).resume();
verify(stateUpdater).add(activeTaskToResume);
verify(tasks).removeTask(activeTaskToResume);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled() {
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
() -> taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
)
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId02 + " is managed by the stream thread");
verifyNoInteractions(activeTaskCreator);
}
@Test
public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final StreamTask activeTaskToCreate = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())),
Collections.emptyMap()
);
verify(activeTaskCreator).createTasks(
consumer,
mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
);
verify(activeTaskToClose).closeClean();
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldAddTasksToStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(stateUpdater).add(task00);
verify(stateUpdater).add(task01);
}
@Test
public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);
}
@Test
public void shouldRetryInitializationWhenLockExceptionAfterRecyclingInStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask task00Converted = standbyTask(taskId00, taskId00Partitions)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
.withInputPartitions(taskId01Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions,
consumer)).thenReturn(task01Converted);
when(standbyTaskCreator.createStandbyTaskFromActive(task00, taskId00Partitions))
.thenReturn(task00Converted);
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException).when(task00Converted).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task00Converted).initializeIfNeeded();
verify(task01Converted).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
);
verify(stateUpdater, never()).add(task00Converted);
verify(stateUpdater).add(task01Converted);
}
@Test
public void shouldRecycleTasksRemovedFromStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask task00Converted = standbyTask(taskId00, taskId00Partitions)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
.withInputPartitions(taskId01Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions,
consumer)).thenReturn(task01Converted);
when(standbyTaskCreator.createStandbyTaskFromActive(task00, taskId00Partitions))
.thenReturn(task00Converted);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
verify(task00).suspend();
verify(task01).suspend();
verify(task00Converted).initializeIfNeeded();
verify(task01Converted).initializeIfNeeded();
verify(stateUpdater).add(task00Converted);
verify(stateUpdater).add(task01Converted);
}
@Test
public void shouldCloseTasksRemovedFromStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
verify(task00).suspend();
verify(task00).closeClean();
verify(task01).suspend();
verify(task01).closeClean();
}
@Test
public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(activeTask, standbyTask));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(any())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(activeTask.id())).thenReturn(taskId02Partitions);
when(tasks.removePendingTaskToUpdateInputPartitions(standbyTask.id())).thenReturn(taskId03Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
verify(activeTask, never()).closeDirty();
verify(activeTask, never()).closeClean();
verify(stateUpdater).add(activeTask);
verify(standbyTask).updateInputPartitions(eq(taskId03Partitions), anyMap());
verify(standbyTask, never()).closeDirty();
verify(standbyTask, never()).closeClean();
verify(stateUpdater).add(standbyTask);
}
@Test
public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(activeTask));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(activeTask.id())).thenReturn(taskId02Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTask).closeClean();
verify(activeTask).revive();
verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
verify(activeTask).initializeIfNeeded();
verify(activeTask, never()).closeDirty();
verify(stateUpdater).add(activeTask);
}
@Test
public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(statefulTask).suspend();
verify(tasks).addTask(statefulTask);
verifyNoInteractions(consumer);
}
@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId04Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks())
.thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions, taskToCloseReviveAndUpdateInputPartitions));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
.thenReturn(convertedTask1);
when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
.thenReturn(convertedTask0);
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
when(tasks.removePendingTaskToCloseClean(argThat(taskId -> !taskId.equals(taskToClose.id())))).thenReturn(false);
when(tasks.removePendingTaskToRecycle(taskToRecycle0.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(taskToRecycle1.id())).thenReturn(taskId01Partitions);
when(tasks.removePendingTaskToRecycle(
argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))
).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId04Partitions);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(taskToCloseReviveAndUpdateInputPartitions.id())).thenReturn(taskId05Partitions);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(
argThat(taskId -> !taskId.equals(taskToCloseReviveAndUpdateInputPartitions.id()))
)).thenReturn(null);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { });
verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
verify(convertedTask0).initializeIfNeeded();
verify(convertedTask1).initializeIfNeeded();
verify(stateUpdater).add(convertedTask0);
verify(stateUpdater).add(convertedTask1);
verify(taskToClose).closeClean();
verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId04Partitions), anyMap());
verify(stateUpdater).add(taskToUpdateInputPartitions);
verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
verify(taskToCloseReviveAndUpdateInputPartitions).revive();
verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
verifyNoInteractions(consumer);
}
@Test
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@Test
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToInitButPendingTasksToRecycle() {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToRecycle()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@Test
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToInit()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@Test
public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTasksToRecycleAndInit() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@Test
public void shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task), tasks);
when(stateUpdater.getTasks()).thenReturn(mkSet(task));
taskManager.handleRevocation(task.inputPartitions());
verify(tasks).addPendingActiveTaskToSuspend(task.id());
verify(stateUpdater, never()).remove(task.id());
}
public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
verify(tasks).addPendingActiveTaskToSuspend(task1.id());
verify(tasks).addPendingActiveTaskToSuspend(task2.id());
}
@Test
public void shouldNotAddActiveTaskWithoutRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task), tasks);
taskManager.handleRevocation(taskId01Partitions);
verify(stateUpdater, never()).remove(task.id());
verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
}
@Test
public void shouldNotRevokeStandbyTaskInStateUpdaterOnRevocation() {
final StandbyTask task = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task), tasks);
taskManager.handleRevocation(taskId00Partitions);
verify(stateUpdater, never()).remove(task.id());
verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
}
@Test
public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StandbyTask task2 = standbyTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks);
taskManager.handleLostAll();
verify(stateUpdater).remove(task1.id());
verify(stateUpdater, never()).remove(task2.id());
verify(stateUpdater).remove(task3.id());
verify(tasks).addPendingTaskToCloseClean(task1.id());
verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
verify(tasks).addPendingTaskToCloseClean(task3.id());
}
private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,
final TasksRegistry tasks) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
return taskManager;
}
@Test
public void shouldTransitRestoredTaskToRunning() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task).completeRestoration(noOpResetter);
verify(task).clearTaskTimeout();
verify(tasks).addTask(task);
verify(consumer).resume(task.inputPartitions());
}
@Test
public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
final TimeoutException timeoutException = new TimeoutException();
doThrow(timeoutException).when(task).completeRestoration(noOpResetter);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
verify(tasks, never()).addTask(task);
verify(task, never()).clearTaskTimeout();
verifyNoInteractions(consumer);
}
private TaskManager setUpTransitionToRunningOfRestoredTask(final StreamTask statefulTask,
final TasksRegistry tasks) {
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
}
@Test
public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@Test
public void shouldRecycleRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
.thenReturn(standbyTask);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
verify(statefulTask).suspend();
verify(standbyTask).initializeIfNeeded();
verify(stateUpdater).add(standbyTask);
}
@Test
public void shouldHandleExceptionThrownDuringConversionInRecycleRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
.thenThrow(new RuntimeException());
assertThrows(
StreamsException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(stateUpdater, never()).add(any());
verify(statefulTask).closeDirty();
}
@Test
public void shouldHandleExceptionThrownDuringTaskInitInRecycleRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.CLOSED)
.withInputPartitions(taskId00Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
.thenReturn(standbyTask);
doThrow(StreamsException.class).when(standbyTask).initializeIfNeeded();
assertThrows(
StreamsException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(stateUpdater, never()).add(any());
verify(standbyTask).closeDirty();
}
private TaskManager setUpRecycleRestoredTask(final StreamTask statefulTask) {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(taskId00Partitions);
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
}
@Test
public void shouldCloseCleanRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
verify(statefulTask).suspend();
verify(statefulTask).closeClean();
verify(statefulTask, never()).closeDirty();
verify(tasks, never()).removeTask(statefulTask);
}
@Test
public void shouldHandleExceptionThrownDuringCloseInCloseCleanRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
doThrow(RuntimeException.class).when(statefulTask).closeClean();
assertThrows(
RuntimeException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
verify(statefulTask).closeDirty();
verify(tasks, never()).removeTask(statefulTask);
}
@Test
public void shouldHandleExceptionThrownDuringClosingTaskProducerInCloseCleanRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.CLOSED)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
final TaskId taskId = statefulTask.id();
doThrow(new RuntimeException("Something happened"))
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);
assertThrows(
RuntimeException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(statefulTask, never()).closeDirty();
verify(tasks, never()).removeTask(statefulTask);
}
private TaskManager setUpCloseCleanRestoredTask(final StreamTask statefulTask,
final TasksRegistry tasks) {
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseClean(statefulTask.id())).thenReturn(true);
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
}
@Test
public void shouldAddBackRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToAddBack(statefulTask.id())).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(stateUpdater).add(statefulTask);
verify(tasks, never()).removeTask(statefulTask);
}
@Test
public void shouldUpdateInputPartitionsOfRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(taskId01Partitions);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(consumer).resume(statefulTask.inputPartitions());
verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
verify(statefulTask).completeRestoration(noOpResetter);
verify(statefulTask).clearTaskTimeout();
verify(tasks).addTask(statefulTask);
}
@Test
public void shouldCloseReviveAndUpdateInputPartitionsOfRestoredTask() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(taskId01Partitions);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
verify(statefulTask).closeClean();
verify(statefulTask).revive();
verify(statefulTask).initializeIfNeeded();
verify(stateUpdater).add(statefulTask);
}
@Test
public void shouldSuspendRestoredTaskIfRevoked() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(statefulTask).suspend();
verify(tasks).addTask(statefulTask);
verifyNoInteractions(consumer);
}
@Test
public void shouldHandleMultipleRestoredTasks() {
final StreamTask taskToTransitToRunning = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask taskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StandbyTask recycledStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final StreamTask taskToCloseClean = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final StreamTask taskToAddBack = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId04Partitions).build();
final StreamTask taskToCloseReviveAndUpdateInputPartitions = statelessTask(taskId05)
.inState(State.RESTORING)
.withInputPartitions(taskId05Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle, taskToRecycle.inputPartitions()))
.thenReturn(recycledStandbyTask);
when(tasks.removePendingTaskToRecycle(taskToRecycle.id())).thenReturn(taskId01Partitions);
when(tasks.removePendingTaskToRecycle(
argThat(taskId -> !taskId.equals(taskToRecycle.id())))
).thenReturn(null);
when(tasks.removePendingTaskToCloseClean(taskToCloseClean.id())).thenReturn(true);
when(tasks.removePendingTaskToCloseClean(
argThat(taskId -> !taskId.equals(taskToCloseClean.id())))
).thenReturn(false);
when(tasks.removePendingTaskToAddBack(taskToAddBack.id())).thenReturn(true);
when(tasks.removePendingTaskToAddBack(
argThat(taskId -> !taskId.equals(taskToAddBack.id())))
).thenReturn(false);
when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId05Partitions);
when(tasks.removePendingTaskToUpdateInputPartitions(
argThat(taskId -> !taskId.equals(taskToUpdateInputPartitions.id())))
).thenReturn(null);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(taskToCloseReviveAndUpdateInputPartitions.id())).thenReturn(taskId04Partitions);
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(
argThat(taskId -> !taskId.equals(taskToCloseReviveAndUpdateInputPartitions.id())))
).thenReturn(null);
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(
taskToTransitToRunning,
taskToRecycle,
taskToCloseClean,
taskToAddBack,
taskToUpdateInputPartitions,
taskToCloseReviveAndUpdateInputPartitions
));
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(tasks).addTask(taskToTransitToRunning);
verify(stateUpdater).add(recycledStandbyTask);
verify(stateUpdater).add(recycledStandbyTask);
verify(taskToCloseClean).closeClean();
verify(stateUpdater).add(taskToAddBack);
verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
verify(taskToCloseReviveAndUpdateInputPartitions).revive();
verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
}
@Test
public void shouldRethrowStreamsExceptionFromStateUpdater() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamsException exception = new StreamsException("boom!");
final ExceptionAndTask exceptionAndTasks = new ExceptionAndTask(exception, statefulTask);
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
assertEquals(exception, thrown);
assertEquals(statefulTask.id(), thrown.taskId().get());
}
@Test
public void shouldRethrowRuntimeExceptionFromStateUpdater() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final RuntimeException exception = new RuntimeException("boom!");
final ExceptionAndTask exceptionAndTasks = new ExceptionAndTask(exception, statefulTask);
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
assertEquals(exception, thrown.getCause());
assertEquals(statefulTask.id(), thrown.taskId().get());
assertEquals("Encounter unexpected fatal error for task 0_0", thrown.getMessage());
}
@Test
public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final ExceptionAndTask exceptionAndTasks0 = new ExceptionAndTask(
new TaskCorruptedException(Collections.singleton(taskId00)),
statefulTask0
);
final ExceptionAndTask exceptionAndTasks1 = new ExceptionAndTask(
new TaskCorruptedException(Collections.singleton(taskId01)),
statefulTask1
);
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}
@Test
public void shouldRethrowTaskCorruptedExceptionFromInitialization() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId01Partitions).build();
final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(statefulTask0, statefulTask1, statefulTask2));
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(tasks).addTask(statefulTask0);
verify(tasks).addTask(statefulTask1);
verify(stateUpdater).add(statefulTask2);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}
@Test
public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
final Map<TaskId, Set<TopicPartition>> activeTasksAssignment = mkMap(
mkEntry(taskId01, mkSet(t1p1)),
mkEntry(taskId02, mkSet(t1p2, t2p2))
);
final Map<TaskId, Set<TopicPartition>> standbyTasksAssignment = mkMap(
mkEntry(taskId03, mkSet(t1p3)),
mkEntry(taskId04, mkSet(t1p4))
);
when(standbyTaskCreator.createTasks(standbyTasksAssignment)).thenReturn(Collections.emptySet());
taskManager.handleAssignment(activeTasksAssignment, standbyTasksAssignment);
verify(topologyBuilder).addSubscribedTopicsFromAssignment(eq(mkSet(t1p1, t1p2, t2p2)), anyString());
verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(eq(mkSet(t1p3, t1p4)), anyString());
verify(activeTaskCreator).createTasks(any(), eq(activeTasksAssignment));
}
@Test
public void shouldNotLockAnythingIfStateDirIsEmpty() {
when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.lockedTaskDirectories().isEmpty());
}
@Test
public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
expectLockObtainedFor(taskId01);
expectLockFailedFor(taskId10);
expectDirectoryNotEmpty(taskId01);
makeTaskFolders(
taskId01.toString(),
taskId10.toString(),
"dummy"
);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}
@Test
public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
expectLockObtainedFor(taskId01, taskId10);
expectDirectoryNotEmpty(taskId01);
when(stateDirectory.directoryForTaskIsEmpty(taskId10)).thenReturn(true);
makeTaskFolders(taskId01.toString(), taskId10.toString());
taskManager.handleRebalanceStart(singleton("topic"));
verify(stateDirectory).unlock(taskId10);
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}
@Test
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
taskManager.handleRebalanceComplete();
verify(consumer).pause(assigned);
}
@Test
public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
taskManager.handleRebalanceComplete();
verify(consumer).pause(mkSet(t1p1));
}
@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
expectLockObtainedFor(taskId00, taskId01, taskId02);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
makeTaskFolders(
taskId00.toString(), // active task
taskId01.toString(), // standby task
taskId02.toString() // unassigned but able to lock
);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
taskManager.handleRebalanceComplete();
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
verify(stateDirectory).unlock(taskId02);
verify(consumer).pause(assignment);
}
@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() throws Exception {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
makeTaskFolders(
taskId00.toString(),
taskId01.toString(),
taskId02.toString(),
taskId03.toString()
);
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1, t1p2);
when(consumer.assignment()).thenReturn(assigned);
taskManager.handleRebalanceStart(singleton("topic"));
taskManager.handleRebalanceComplete();
verify(consumer).pause(mkSet(t1p1, t1p2));
verify(stateDirectory).unlock(taskId03);
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
}
@Test
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET),
mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Task.LATEST_OFFSET));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
@Test
public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), 5L),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
@Test
public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception {
final StreamTask restoringStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING).build();
final long changelogOffset = 42L;
when(restoringStatefulTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset)));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
}
@Test
public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception {
final StandbyTask restoringStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffset = 42L;
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffset)));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
}
@Test
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffsetOfRunningTask = 42L;
final long changelogOffsetOfRestoringStatefulTask = 24L;
final long changelogOffsetOfRestoringStandbyTask = 84L;
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
when(restoringStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask)));
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask));
assertThat(
taskManager.getTaskOffsetSums(),
is(mkMap(
mkEntry(taskId00, changelogOffsetOfRunningTask),
mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask),
mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask)
))
);
}
@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), OffsetCheckpoint.OFFSET_UNKNOWN),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 10L));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask restoringTask = handleAssignment(
emptyMap(),
emptyMap(),
taskId00Assignment
).get(taskId00);
restoringTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldComputeOffsetSumForStandbyTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), 5L),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask restoringTask = handleAssignment(
emptyMap(),
taskId00Assignment,
emptyMap()
).get(taskId00);
restoringTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), 5L),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), 5L),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(uninitializedTask.state(), is(State.CREATED));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), 5L),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
taskManager.handleRebalanceStart(singleton("topic"));
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(closedTask));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
closedTask.suspend();
closedTask.closeClean();
assertThat(closedTask.state(), is(State.CLOSED));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
expectLockFailedFor(taskId00);
makeTaskFolders(taskId00.toString());
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.lockedTaskDirectories().isEmpty());
assertTrue(taskManager.getTaskOffsetSums().isEmpty());
}
@Test
public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
expectDirectoryNotEmpty(taskId00);
when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00));
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.getTaskOffsetSums().isEmpty());
}
@Test
public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception {
final long largeOffset = Long.MAX_VALUE / 2;
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 1), largeOffset),
mkEntry(new TopicPartition("changelog", 2), largeOffset),
mkEntry(new TopicPartition("changelog", 3), largeOffset)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Long.MAX_VALUE));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
// first `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
assertThat(task00.state(), is(Task.State.SUSPENDED));
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void closeClean() {
throw new RuntimeException("KABOOM!");
}
};
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
taskManager.handleRevocation(taskId00Partitions);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(
thrown.getMessage(),
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
// `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId00.toString()), null));
taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId01.toString()), null));
when(stateDirectory.listNonEmptyTaskDirectories())
.thenReturn(taskFolders)
.thenReturn(new ArrayList<>());
expectLockObtainedFor(taskId00, taskId01);
expectDirectoryNotEmpty(taskId00, taskId01);
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01)));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
// `handleLostAll`
taskManager.handleLostAll();
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01)));
// The locked task map will not be cleared.
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
taskManager.handleLostAll();
verify(activeTaskCreator).reInitializeThreadProducer();
}
@Test
public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
// `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
// `handleAssignment`
doThrow(new RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
assertThat(
thrown.getMessage(),
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
}
@Test
public void shouldReAddRevivedTasksToStateUpdater() {
final StreamTask corruptedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), corruptedStandbyTask.id()));
final InOrder activeTaskOrder = inOrder(corruptedActiveTask);
activeTaskOrder.verify(corruptedActiveTask).closeDirty();
activeTaskOrder.verify(corruptedActiveTask).revive();
final InOrder standbyTaskOrder = inOrder(corruptedStandbyTask);
standbyTaskOrder.verify(corruptedStandbyTask).closeDirty();
standbyTaskOrder.verify(corruptedStandbyTask).revive();
verify(tasks).removeTask(corruptedActiveTask);
verify(tasks).removeTask(corruptedStandbyTask);
verify(tasks).addPendingTasksToInit(mkSet(corruptedActiveTask));
verify(tasks).addPendingTasksToInit(mkSet(corruptedStandbyTask));
verify(consumer).assignment();
}
@Test
public void shouldReviveCorruptTasks() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void postCommit(final boolean enforceCheckpoint) {
if (enforceCheckpoint) {
enforcedCheckpoint.set(true);
}
super.postCommit(enforceCheckpoint);
}
};
// `handleAssignment`
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(taskId00Partitions);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task00.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertThat(enforcedCheckpoint.get(), is(true));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("oops");
}
};
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(taskId00Partitions);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task00.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>(taskId00Assignment);
firstAssignment.putAll(taskId01Assignment);
// `handleAssignment`
when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedTask, nonCorruptedTask));
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(taskId00Partitions);
taskManager.handleAssignment(firstAssignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
nonCorruptedTask.setCommitNeeded();
corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertTrue(nonCorruptedTask.commitPrepared);
assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
// check that we should not commit empty map either
verify(consumer, never()).commitSync(emptyMap());
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
nonRunningNonCorruptedTask.setCommitNeeded();
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
// `handleAssignment`
when(activeTaskCreator.createTasks(any(), eq(assignment)))
.thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
when(consumer.assignment()).thenReturn(taskId00Partitions);
taskManager.handleAssignment(assignment, emptyMap());
corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
assertThat(nonRunningNonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningStandbyTasksWithStateUpdaterEnabled() {
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(mkSet(taskId02));
verify(activeRestoringTask, never()).commitNeeded();
verify(activeRestoringTask, never()).prepareCommit();
verify(activeRestoringTask, never()).postCommit(anyBoolean());
verify(standbyTask, never()).commitNeeded();
verify(standbyTask, never()).prepareCommit();
verify(standbyTask, never()).postCommit(anyBoolean());
}
@Test
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled() {
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
when(standbyTask.commitNeeded()).thenReturn(true);
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, activeRestoringTask),
mkEntry(taskId01, standbyTask),
mkEntry(taskId02, corruptedTask)
));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(mkSet(taskId02));
verify(activeRestoringTask, never()).commitNeeded();
verify(activeRestoringTask, never()).prepareCommit();
verify(activeRestoringTask, never()).postCommit(anyBoolean());
verify(standbyTask).prepareCommit();
verify(standbyTask).postCommit(anyBoolean());
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new TaskMigratedException("You dropped out of the group!", new RuntimeException());
}
};
// handleAssignment
when(activeTaskCreator.createTasks(any(), eq(taskId01Assignment)))
.thenReturn(singleton(runningNonCorruptedActive));
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singleton(corruptedStandby));
when(consumer.assignment()).thenReturn(assignment);
taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
// make sure this will be committed and throw
assertThat(runningNonCorruptedActive.state(), is(Task.State.RUNNING));
assertThat(corruptedStandby.state(), is(Task.State.RUNNING));
runningNonCorruptedActive.setCommitNeeded();
corruptedStandby.setChangelogOffsets(singletonMap(t1p0, 0L));
assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singleton(taskId00)));
assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
// make sure this will attempt to be committed and throw
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActive.setCommitNeeded();
// handleAssignment
final Map<TaskId, Set<TopicPartition>> firstAssignement = new HashMap<>();
firstAssignement.putAll(taskId00Assignment);
firstAssignement.putAll(taskId01Assignment);
when(activeTaskCreator.createTasks(any(), eq(firstAssignement)))
.thenReturn(asList(corruptedActive, uncorruptedActive));
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
taskManager.handleAssignment(firstAssignement, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
assertThat(uncorruptedActive.commitPrepared, is(false));
assertThat(uncorruptedActive.commitNeeded, is(true));
assertThat(uncorruptedActive.commitCompleted, is(false));
taskManager.handleRebalanceStart(singleton(topic1));
assertThat(taskManager.rebalanceInProgress(), is(true));
taskManager.handleCorruption(singleton(taskId00));
assertThat(uncorruptedActive.commitPrepared, is(false));
assertThat(uncorruptedActive.commitNeeded, is(true));
assertThat(uncorruptedActive.commitCompleted, is(false));
assertThat(uncorruptedActive.state(), is(State.RUNNING));
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
fail("Should not try to mark changelogs as corrupted for uncorrupted task");
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
// handleAssignment
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
firstAssignment.putAll(taskId00Assignment);
firstAssignment.putAll(taskId01Assignment);
when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedActive, uncorruptedActive));
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
doThrow(new TimeoutException()).when(consumer).commitSync(offsets);
taskManager.handleAssignment(firstAssignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
assertThat(corruptedActive.state(), is(Task.State.RUNNING));
// make sure this will be committed and throw
uncorruptedActive.setCommitNeeded();
corruptedActive.setChangelogOffsets(singletonMap(t1p0, 0L));
assertThat(uncorruptedActive.commitPrepared, is(false));
assertThat(uncorruptedActive.commitNeeded, is(true));
assertThat(uncorruptedActive.commitCompleted, is(false));
assertThat(corruptedActive.commitPrepared, is(false));
assertThat(corruptedActive.commitNeeded, is(false));
assertThat(corruptedActive.commitCompleted, is(false));
taskManager.handleCorruption(singleton(taskId00));
assertThat(uncorruptedActive.commitPrepared, is(true));
assertThat(uncorruptedActive.commitNeeded, is(false));
assertThat(uncorruptedActive.commitCompleted, is(false)); //if not corrupted, we should close dirty without committing
assertThat(corruptedActive.commitPrepared, is(true));
assertThat(corruptedActive.commitNeeded, is(false));
assertThat(corruptedActive.commitCompleted, is(true)); //if corrupted, should enforce checkpoint with corrupted tasks removed
assertThat(corruptedActive.state(), is(Task.State.CREATED));
assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
super.markChangelogAsCorrupted(partitions);
corruptedTaskChangelogMarkedAsCorrupted.set(true);
}
};
final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
super.markChangelogAsCorrupted(partitions);
uncorruptedTaskChangelogMarkedAsCorrupted.set(true);
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
// handleAssignment
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
firstAssignment.putAll(taskId00Assignment);
firstAssignment.putAll(taskId01Assignment);
when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
when(consumer.groupMetadata()).thenReturn(groupMetadata);
doThrow(new TimeoutException()).when(producer).commitTransaction(offsets, groupMetadata);
taskManager.handleAssignment(firstAssignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(uncorruptedActiveTask.state(), is(Task.State.RUNNING));
assertThat(corruptedActiveTask.state(), is(Task.State.RUNNING));
// make sure this will be committed and throw
uncorruptedActiveTask.setCommitNeeded();
final Map<TopicPartition, Long> corruptedActiveTaskChangelogOffsets = singletonMap(t1p0changelog, 0L);
corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets);
final Map<TopicPartition, Long> uncorruptedActiveTaskChangelogOffsets = singletonMap(t1p1changelog, 0L);
uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets);
assertThat(uncorruptedActiveTask.commitPrepared, is(false));
assertThat(uncorruptedActiveTask.commitNeeded, is(true));
assertThat(uncorruptedActiveTask.commitCompleted, is(false));
assertThat(corruptedActiveTask.commitPrepared, is(false));
assertThat(corruptedActiveTask.commitNeeded, is(false));
assertThat(corruptedActiveTask.commitCompleted, is(false));
taskManager.handleCorruption(singleton(taskId00));
assertThat(uncorruptedActiveTask.commitPrepared, is(true));
assertThat(uncorruptedActiveTask.commitNeeded, is(false));
assertThat(uncorruptedActiveTask.commitCompleted, is(true)); //if corrupted due to timeout on commit, should enforce checkpoint with corrupted tasks removed
assertThat(corruptedActiveTask.commitPrepared, is(true));
assertThat(corruptedActiveTask.commitNeeded, is(false));
assertThat(corruptedActiveTask.commitCompleted, is(true)); //if corrupted, should enforce checkpoint with corrupted tasks removed
assertThat(corruptedActiveTask.state(), is(Task.State.CREATED));
assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED));
assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() {
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
revokedActiveTask.setCommitNeeded();
final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
fail("Should not try to mark changelogs as corrupted for uncorrupted task");
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01);
unrevokedActiveTaskWithCommitNeeded.setCommitNeeded();
final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
doThrow(new TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.RUNNING));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets);
revokedActiveTask.setCommitNeeded();
final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask unrevokedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
super.markChangelogAsCorrupted(partitions);
unrevokedTaskChangelogMarkedAsCorrupted.set(true);
}
};
final Map<TopicPartition, OffsetAndMetadata> unrevokedTaskOffsets = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets);
unrevokedActiveTask.setCommitNeeded();
final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<>();
expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
when(consumer.groupMetadata()).thenReturn(groupMetadata);
doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
assertThat(unrevokedActiveTask.state(), is(Task.State.RUNNING));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
final Map<TopicPartition, Long> revokedActiveTaskChangelogOffsets = singletonMap(t1p0changelog, 0L);
revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets);
final Map<TopicPartition, Long> unrevokedActiveTaskChangelogOffsets = singletonMap(t1p1changelog, 0L);
unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets);
taskManager.handleRevocation(taskId00Partitions);
assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTask.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(task00));
taskManager.handleAssignment(emptyMap(), taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
}
@Test
public void shouldAddNonResumedSuspendedTasks() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
// expect these calls twice (because we're going to tryToCompleteRestoration twice)
verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
verify(consumer, times(2)).assignment();
verify(consumer, times(2)).resume(assignment);
}
@Test
public void shouldUpdateInputPartitionsAfterRebalance() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
final Set<TopicPartition> newPartitionsSet = mkSet(t1p1);
final Map<TaskId, Set<TopicPartition>> taskIdSetMap = singletonMap(taskId00, newPartitionsSet);
taskManager.handleAssignment(taskIdSetMap, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertEquals(newPartitionsSet, task00.inputPartitions());
// expect these calls twice (because we're going to tryToCompleteRestoration twice)
verify(consumer, times(2)).resume(assignment);
verify(consumer, times(2)).assignment();
verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
}
@Test
public void shouldAddNewActiveTasks() {
final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(consumer).assignment();
verify(consumer).resume(eq(emptySet()));
}
@Test
public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions)
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void initializeIfNeeded() {
throw new LockException("can't lock");
}
};
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void initializeIfNeeded() {
throw new TimeoutException("timed out");
}
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verifyNoInteractions(consumer);
}
@Test
public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new TimeoutException("timeout!");
}
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verifyNoInteractions(consumer);
}
@Test
public void shouldSuspendActiveTasksDuringRevocation() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
assertThat(task00.state(), is(Task.State.SUSPENDED));
}
@Test
public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
final StreamsProducer producer = mock(StreamsProducer.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets00);
task00.setCommitNeeded();
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
task01.setCommittableOffsetsAndMetadata(offsets01);
task01.setCommitNeeded();
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null));
task02.setCommittableOffsetsAndMetadata(offsets02);
final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager);
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
mkEntry(taskId10, taskId10Partitions)
);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task10));
final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
when(consumer.groupMetadata()).thenReturn(groupMetadata);
task00.committedOffsets();
task01.committedOffsets();
task02.committedOffsets();
task10.committedOffsets();
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
assertThat(task00.commitNeeded, is(false));
assertThat(task01.commitNeeded, is(false));
assertThat(task02.commitPrepared, is(false));
assertThat(task10.commitPrepared, is(false));
verify(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
}
@Test
public void shouldCommitAllNeededTasksOnHandleRevocation() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets00);
task00.setCommitNeeded();
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
task01.setCommittableOffsetsAndMetadata(offsets01);
task01.setCommitNeeded();
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null));
task02.setCommittableOffsetsAndMetadata(offsets02);
final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager);
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
mkEntry(taskId10, taskId10Partitions)
);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task10));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
assertThat(task00.commitNeeded, is(false));
assertThat(task00.commitPrepared, is(true));
assertThat(task00.commitNeeded, is(false));
assertThat(task01.commitPrepared, is(true));
assertThat(task02.commitPrepared, is(false));
assertThat(task10.commitPrepared, is(false));
verify(consumer).commitSync(expectedCommittedOffsets);
}
@Test
public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets00);
task00.setCommitNeeded();
final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager);
final Map<TaskId, Set<TopicPartition>> assignmentActive = singletonMap(taskId00, taskId00Partitions);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = singletonMap(taskId10, taskId10Partitions);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(task00.commitNeeded, is(true));
assertThat(task10.commitPrepared, is(false));
}
@Test
public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets00);
task00.setCommitNeeded();
final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager);
final Map<TaskId, Set<TopicPartition>> assignmentActive = singletonMap(taskId00, taskId00Partitions);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = singletonMap(taskId10, taskId10Partitions);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
assertThat(task00.commitNeeded, is(true));
}
@Test
public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.handleRevocation(taskId00Partitions);
assertThat(task00.state(), is(Task.State.SUSPENDED));
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldPassUpIfExceptionDuringSuspend() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("KABOOM!");
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(taskId00Partitions));
assertThat(task00.state(), is(Task.State.SUSPENDED));
}
@Test
public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions),
mkEntry(taskId03, taskId03Partitions)
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false);
final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
@Override
public void closeDirty() {
super.closeDirty();
closedDirtyTask01.set(true);
}
};
final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("oops");
}
@Override
public void closeDirty() {
super.closeDirty();
closedDirtyTask02.set(true);
}
};
final Task task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("oops");
}
@Override
public void closeDirty() {
super.closeDirty();
closedDirtyTask03.set(true);
}
};
when(activeTaskCreator.createTasks(any(), eq(assignment)))
.thenReturn(asList(task00, task01, task02, task03));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(task02.state(), is(Task.State.CREATED));
assertThat(task03.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
assertThat(task03.state(), is(Task.State.RUNNING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(
mkMap(
mkEntry(taskId00, task00),
mkEntry(taskId01, task01),
mkEntry(taskId02, task02),
mkEntry(taskId03, task03)
)
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(
RuntimeException.class,
() -> taskManager.shutdown(true)
);
assertThat(exception.getCause().getMessage(), is("oops"));
assertThat(closedDirtyTask01.get(), is(true));
assertThat(closedDirtyTask02.get(), is(true));
assertThat(closedDirtyTask03.get(), is(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(task01.state(), is(Task.State.CLOSED));
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(task03.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever"))
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(
mkMap(
mkEntry(taskId00, task00)
)
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(exception.getCause().getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(
mkMap(
mkEntry(taskId00, task00)
)
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(exception.getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, false);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new RuntimeException("task 0_1 prepare commit boom!");
}
};
task01.setCommitNeeded();
taskManager.addTask(task00);
taskManager.addTask(task01);
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleAssignment(
Collections.emptyMap(),
singletonMap(taskId00, taskId00Partitions)
));
assertThat(thrown.getCause().getMessage(), is("task 0_1 prepare commit boom!"));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CLOSED));
// All the tasks involving in the commit should already be removed.
assertThat(taskManager.allTasks(), is(Collections.singletonMap(taskId00, task00)));
}
@Test
public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("task 0_1 suspend boom!");
}
};
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
taskManager.addTask(task00);
taskManager.addTask(task01);
taskManager.addTask(task02);
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleRevocation(union(HashSet::new, taskId01Partitions, taskId02Partitions)));
assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!"));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.SUSPENDED));
assertThat(task02.state(), is(Task.State.SUSPENDED));
verifyNoInteractions(activeTaskCreator);
}
@Test
public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
};
final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("oops");
}
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01, task02));
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
doThrow(new RuntimeException("whatever all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(task02.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(
mkMap(
mkEntry(taskId00, task00),
mkEntry(taskId01, task01),
mkEntry(taskId02, task02)
)
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(changeLogReader).completedChangelogs();
taskManager.shutdown(false);
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(task01.state(), is(Task.State.CLOSED));
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
public void shouldCloseStandbyTasksOnShutdown() {
final Map<TaskId, Set<TopicPartition>> assignment = singletonMap(taskId00, taskId00Partitions);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
// `handleAssignment`
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
taskManager.handleAssignment(emptyMap(), assignment);
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
taskManager.shutdown(true);
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
// `tryToCompleteRestoration`
verify(consumer).assignment();
verify(consumer).resume(eq(emptySet()));
}
@Test
public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final StreamTask failedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask failedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
when(stateUpdater.drainExceptionsAndFailedTasks())
.thenReturn(Arrays.asList(
new ExceptionAndTask(new RuntimeException(), failedStatefulTask),
new ExceptionAndTask(new RuntimeException(), failedStandbyTask))
);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(failedStatefulTask).prepareCommit();
verify(failedStatefulTask).suspend();
verify(failedStatefulTask).closeDirty();
}
@Test
public void shouldShutdownSchedulingTaskManager() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
taskManager.shutdown(true);
verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE));
}
@Test
public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING).build();
final Set<StreamTask> restoredActiveTasks = mkSet(statefulTask1, statefulTask2);
final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t -> (Task) t).collect(Collectors.toSet());
when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
when(tasks.activeTasks()).thenReturn(restoredTasks);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addActiveTasks(restoredTasks);
verify(statefulTask1).closeClean();
verify(statefulTask2).closeClean();
}
@Test
public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final StreamTask removedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask removedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, removedStatefulTask));
when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, removedStandbyTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
verify(removedStatefulTask).closeClean();
verify(removedStandbyTask).closeClean();
}
@Test
public void shouldInitializeNewActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// verifies that we actually resume the assignment at the end of restoration.
verify(consumer).resume(assignment);
}
@Test
public void shouldInitialiseNewStandbyTasks() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.equalTo(singletonMap(taskId01, task01)));
}
@Test
public void shouldHandleRebalanceEvents() {
when(consumer.assignment()).thenReturn(assignment);
when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
assertThat(taskManager.rebalanceInProgress(), is(false));
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.rebalanceInProgress(), is(true));
taskManager.handleRebalanceComplete();
assertThat(taskManager.rebalanceInProgress(), is(false));
verify(consumer).pause(assignment);
}
@Test
public void shouldCommitActiveAndStandbyTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment))
.thenReturn(singletonList(task01));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
task01.setCommitNeeded();
assertThat(taskManager.commitAll(), equalTo(2));
assertThat(task00.commitNeeded, is(false));
assertThat(task01.commitNeeded, is(false));
verify(consumer).commitSync(offsets);
}
@Test
public void shouldCommitProvidedTasksIfNeeded() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, false, stateManager);
final StateMachineTask task04 = new StateMachineTask(taskId04, taskId04Partitions, false, stateManager);
final StateMachineTask task05 = new StateMachineTask(taskId05, taskId05Partitions, false, stateManager);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
mkEntry(taskId03, taskId03Partitions),
mkEntry(taskId04, taskId04Partitions),
mkEntry(taskId05, taskId05Partitions)
);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(Arrays.asList(task00, task01, task02));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(Arrays.asList(task03, task04, task05));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
task01.setCommitNeeded();
task03.setCommitNeeded();
task04.setCommitNeeded();
assertThat(taskManager.commit(mkSet(task00, task02, task03, task05)), equalTo(2));
assertThat(task00.commitNeeded, is(false));
assertThat(task01.commitNeeded, is(true));
assertThat(task02.commitNeeded, is(false));
assertThat(task03.commitNeeded, is(false));
assertThat(task04.commitNeeded, is(true));
assertThat(task05.commitNeeded, is(false));
}
@Test
public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(task00));
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
assertThat(taskManager.commitAll(), equalTo(1));
assertThat(task00.commitNeeded, is(false));
}
@Test
public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
makeTaskFolders(taskId00.toString(), taskId01.toString());
expectDirectoryNotEmpty(taskId00, taskId01);
expectLockObtainedFor(taskId00, taskId01);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment))
.thenReturn(singletonList(task01));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
task01.setCommitNeeded();
taskManager.handleRebalanceStart(emptySet());
assertThat(
taskManager.commitAll(),
equalTo(-1) // sentinel indicating that nothing was done because a rebalance is in progress
);
assertThat(
taskManager.maybeCommitActiveTasksPerUserRequested(),
equalTo(-1) // sentinel indicating that nothing was done because a rebalance is in progress
);
}
@Test
public void shouldCommitViaConsumerIfEosDisabled() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
task01.setCommittableOffsetsAndMetadata(offsets);
task01.setCommitNeeded();
taskManager.addTask(task01);
taskManager.commitAll();
verify(consumer).commitSync(offsets);
}
@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
.thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
verifyNoMoreInteractions(producer);
}
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
final Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>();
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
verifyNoMoreInteractions(producer);
}
private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final TaskManager taskManager = setUpTaskManager(processingMode, false);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
task01.setCommitNeeded();
taskManager.addTask(task01);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
task02.setCommittableOffsetsAndMetadata(offsetsT02);
task02.setCommitNeeded();
taskManager.addTask(task02);
when(consumer.groupMetadata()).thenReturn(new ConsumerGroupMetadata("appId"));
taskManager.commitAll();
}
@Test
public void shouldPropagateExceptionFromActiveCommit() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new RuntimeException("opsh.");
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
final RuntimeException thrown =
assertThrows(RuntimeException.class, () -> taskManager.commitAll());
assertThat(thrown.getMessage(), equalTo("opsh."));
}
@Test
public void shouldPropagateExceptionFromStandbyCommit() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new RuntimeException("opsh.");
}
};
when(consumer.assignment()).thenReturn(assignment);
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task01.state(), is(Task.State.RUNNING));
task01.setCommitNeeded();
final RuntimeException thrown =
assertThrows(RuntimeException.class, () -> taskManager.commitAll());
assertThat(thrown.getMessage(), equalTo("opsh."));
}
@Test
public void shouldSendPurgeData() {
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
final InOrder inOrder = inOrder(adminClient);
final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
return purgableOffsets;
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
purgableOffsets.put(t1p1, 5L);
taskManager.maybePurgeCommittedRecords();
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)));
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L)));
inOrder.verifyNoMoreInteractions();
}
@Test
public void shouldNotSendPurgeDataIfPreviousNotDone() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)));
final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
return purgableOffsets;
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
purgableOffsets.put(t1p1, 5L);
taskManager.maybePurgeCommittedRecords();
// this call should be a no-op.
// this is verified, as there is no expectation on adminClient for this second call,
// so it would fail verification if we invoke the admin client again.
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();
}
@Test
public void shouldIgnorePurgeDataErrors() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(consumer.assignment()).thenReturn(assignment);
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords));
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
when(adminClient.deleteRecords(any())).thenReturn(deleteRecordsResult);
taskManager.addTask(task00);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setPurgeableOffsets(singletonMap(t1p1, 5L));
taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();
}
@Test
public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets0 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets0);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets1 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
task01.setCommittableOffsetsAndMetadata(offsets1);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets2 = singletonMap(t1p2, new OffsetAndMetadata(2L, null));
task02.setCommittableOffsetsAndMetadata(offsets2);
final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateManager);
final StateMachineTask task04 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager);
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<>();
expectedCommittedOffsets.putAll(offsets0);
expectedCommittedOffsets.putAll(offsets1);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions),
mkEntry(taskId03, taskId03Partitions)
);
final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
mkEntry(taskId10, taskId10Partitions)
);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02, task03));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task04));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
assertThat(task03.state(), is(Task.State.RUNNING));
assertThat(task04.state(), is(Task.State.RUNNING));
task00.setCommitNeeded();
task00.setCommitRequested();
task01.setCommitNeeded();
task02.setCommitRequested();
task03.setCommitNeeded();
task03.setCommitRequested();
task04.setCommitNeeded();
task04.setCommitRequested();
assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(3));
verify(consumer).commitSync(expectedCommittedOffsets);
}
@Test
public void shouldProcessActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
firstAssignment.put(taskId00, taskId00Partitions);
firstAssignment.put(taskId01, taskId01Partitions);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(Arrays.asList(task00, task01));
taskManager.handleAssignment(firstAssignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
task00.addRecords(
t1p0,
Arrays.asList(
getConsumerRecord(t1p0, 0L),
getConsumerRecord(t1p0, 1L),
getConsumerRecord(t1p0, 2L),
getConsumerRecord(t1p0, 3L),
getConsumerRecord(t1p0, 4L),
getConsumerRecord(t1p0, 5L)
)
);
task01.addRecords(
t1p1,
Arrays.asList(
getConsumerRecord(t1p1, 0L),
getConsumerRecord(t1p1, 1L),
getConsumerRecord(t1p1, 2L),
getConsumerRecord(t1p1, 3L),
getConsumerRecord(t1p1, 4L)
)
);
// check that we should be processing at most max num records
assertThat(taskManager.process(3, time), is(6));
// check that if there's no records processable, we would stop early
assertThat(taskManager.process(3, time), is(5));
assertThat(taskManager.process(3, time), is(0));
}
@Test
public void shouldNotFailOnTimeoutException() {
final AtomicReference<TimeoutException> timeoutException = new AtomicReference<>();
timeoutException.set(new TimeoutException("Skip me!"));
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
task00.transitionTo(State.RESTORING);
task00.transitionTo(State.RUNNING);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public boolean process(final long wallClockTime) {
final TimeoutException exception = timeoutException.get();
if (exception != null) {
throw exception;
}
return true;
}
};
task01.transitionTo(State.RESTORING);
task01.transitionTo(State.RUNNING);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
task02.transitionTo(State.RESTORING);
task02.transitionTo(State.RUNNING);
taskManager.addTask(task00);
taskManager.addTask(task01);
taskManager.addTask(task02);
task00.addRecords(
t1p0,
Arrays.asList(
getConsumerRecord(t1p0, 0L),
getConsumerRecord(t1p0, 1L)
)
);
task01.addRecords(
t1p1,
Arrays.asList(
getConsumerRecord(t1p1, 0L),
getConsumerRecord(t1p1, 1L)
)
);
task02.addRecords(
t1p2,
Arrays.asList(
getConsumerRecord(t1p2, 0L),
getConsumerRecord(t1p2, 1L)
)
);
// should only process 2 records, because task01 throws TimeoutException
assertThat(taskManager.process(1, time), is(2));
assertThat(task01.timeout, equalTo(time.milliseconds()));
// retry without error
timeoutException.set(null);
assertThat(taskManager.process(1, time), is(3));
assertThat(task01.timeout, equalTo(null));
// there should still be one record for task01 to be processed
assertThat(taskManager.process(1, time), is(1));
}
@Test
public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public boolean process(final long wallClockTime) {
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
final TopicPartition partition = taskId00Partitions.iterator().next();
task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L)));
assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time));
}
@Test
public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public boolean process(final long wallClockTime) {
throw new RuntimeException("oops");
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
final TopicPartition partition = taskId00Partitions.iterator().next();
task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L)));
final StreamsException exception = assertThrows(StreamsException.class, () -> taskManager.process(1, time));
assertThat(exception.taskId().isPresent(), is(true));
assertThat(exception.taskId().get(), is(taskId00));
assertThat(exception.getCause().getMessage(), is("oops"));
}
@Test
public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public boolean maybePunctuateStreamTime() {
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThrows(TaskMigratedException.class, () -> taskManager.punctuate());
}
@Test
public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public boolean maybePunctuateStreamTime() {
throw new KafkaException("oops");
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThrows(KafkaException.class, () -> taskManager.punctuate());
}
@Test
public void shouldPunctuateActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public boolean maybePunctuateStreamTime() {
return true;
}
@Override
public boolean maybePunctuateSystemTime() {
return true;
}
};
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
// one for stream and one for system time
assertThat(taskManager.punctuate(), equalTo(2));
}
@Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(new TopicPartition("fake", 0));
}
};
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.RESTORING));
verifyNoInteractions(consumer);
}
@Test
public void shouldHaveRemainingPartitionsUncleared() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
appender.setClassLoggerToDebug(TaskManager.class);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(mkSet(t1p0, new TopicPartition("unknown", 0)));
assertThat(task00.state(), is(Task.State.SUSPENDED));
final List<String> messages = appender.getMessages();
assertThat(
messages,
hasItem("taskManagerTestThe following revoked partitions [unknown-0] are missing " +
"from the current task partitions. It could potentially be due to race " +
"condition of consumer detecting the heartbeat failure, or the " +
"tasks have been cleaned up by the handleAssignment callback.")
);
}
}
@Test
public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("t2 close exception", new RuntimeException());
}
};
taskManager.addTask(migratedTask01);
taskManager.addTask(migratedTask02);
final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
// The task map orders tasks based on topic group id and partition, so here
// t1 should always be the first.
assertThat(
thrown.getMessage(),
equalTo("t2 close exception; it means all tasks belonging to this thread should be migrated.")
);
}
@Test
public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
}
};
taskManager.addTask(migratedTask01);
taskManager.addTask(migratedTask02);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
// Fatal exception thrown first.
assertThat(thrown.getMessage(), equalTo("Encounter unexpected fatal error for task 0_2"));
assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception"));
}
@Test
public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new KafkaException("Kaboom for t2!", new RuntimeException());
}
};
taskManager.addTask(migratedTask01);
taskManager.addTask(migratedTask02);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
assertThat(thrown.taskId().isPresent(), is(true));
assertThat(thrown.taskId().get(), is(taskId02));
// Expecting the original Kafka exception wrapped in the StreamsException.
assertThat(thrown.getCause().getMessage(), equalTo("Kaboom for t2!"));
}
@Test
public void shouldTransmitProducerMetrics() {
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
final Metric testMetric = new KafkaMetric(
new Object(),
testMetricName,
(Measurable) (config, now) -> 0,
null,
new MockTime());
final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
when(activeTaskCreator.producerMetrics()).thenReturn(dummyProducerMetrics);
assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
}
private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<TopicPartition>> runningActiveAssignment,
final Map<TaskId, Set<TopicPartition>> standbyAssignment,
final Map<TaskId, Set<TopicPartition>> restoringActiveAssignment) {
final Set<Task> runningTasks = runningActiveAssignment.entrySet().stream()
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
.collect(Collectors.toSet());
final Set<Task> standbyTasks = standbyAssignment.entrySet().stream()
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), false, stateManager))
.collect(Collectors.toSet());
final Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream()
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
.collect(Collectors.toSet());
// give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring
restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
// Initially assign only the active tasks we want to complete restoration
final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<>(runningActiveAssignment);
allActiveTasksAssignment.putAll(restoringActiveAssignment);
final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
allActiveTasks.addAll(restoringTasks);
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
when(activeTaskCreator.createTasks(any(), eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
lenient().when(consumer.assignment()).thenReturn(assignment);
taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
// Just make sure all tasks ended up in the expected state
for (final Task task : runningTasks) {
assertThat(task.state(), is(Task.State.RUNNING));
allTasks.put(task.id(), (StateMachineTask) task);
}
for (final Task task : restoringTasks) {
assertThat(task.state(), is(Task.State.RESTORING));
allTasks.put(task.id(), (StateMachineTask) task);
}
for (final Task task : standbyTasks) {
assertThat(task.state(), is(Task.State.RUNNING));
allTasks.put(task.id(), (StateMachineTask) task);
}
return allTasks;
}
private void expectLockObtainedFor(final TaskId... tasks) {
for (final TaskId task : tasks) {
when(stateDirectory.lock(task)).thenReturn(true);
}
}
private void expectLockFailedFor(final TaskId... tasks) {
for (final TaskId task : tasks) {
when(stateDirectory.lock(task)).thenReturn(false);
}
}
private void expectDirectoryNotEmpty(final TaskId... tasks) {
for (final TaskId taskId : tasks) {
when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(false);
}
}
@Test
public void shouldThrowTaskMigratedExceptionOnCommitFailed() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task01.setCommittableOffsetsAndMetadata(offsets);
task01.setCommitNeeded();
taskManager.addTask(task01);
doThrow(new CommitFailedException()).when(consumer).commitSync(offsets);
final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class,
() -> taskManager.commitAll()
);
assertThat(thrown.getCause(), instanceOf(CommitFailedException.class));
assertThat(
thrown.getMessage(),
equalTo("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group;" +
" it means all tasks belonging to this thread should be migrated.")
);
assertThat(task01.state(), is(Task.State.CREATED));
}
@SuppressWarnings("unchecked")
@Test
public void shouldNotFailForTimeoutExceptionOnConsumerCommit() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task00.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0))));
task01.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0))));
doThrow(new TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class));
task00.setCommitNeeded();
assertThat(taskManager.commit(mkSet(task00, task01)), equalTo(0));
assertThat(task00.timeout, equalTo(time.milliseconds()));
assertNull(task01.timeout);
assertThat(taskManager.commit(mkSet(task00, task01)), equalTo(1));
assertNull(task00.timeout);
assertNull(task01.timeout);
verify(consumer, times(2)).commitSync(any(Map.class));
}
@Test
public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
final Tasks tasks = mock(Tasks.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
doThrow(new TimeoutException("KABOOM!"))
.doNothing()
.doNothing()
.doNothing()
.when(producer).commitTransaction(offsetsT00, null);
doNothing()
.doNothing()
.when(producer).commitTransaction(offsetsT01, null);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
task00.setCommittableOffsetsAndMetadata(offsetsT00);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
when(tasks.allTasks()).thenReturn(mkSet(task00, task01, task02));
task00.setCommitNeeded();
task01.setCommitNeeded();
final TaskCorruptedException exception = assertThrows(
TaskCorruptedException.class,
() -> taskManager.commit(mkSet(task00, task01, task02))
);
assertThat(
exception.corruptedTasks(),
equalTo(Collections.singleton(taskId00))
);
verify(consumer, times(2)).groupMetadata();
}
@Test
public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
final Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>(offsetsT00);
allOffsets.putAll(offsetsT01);
doThrow(new TimeoutException("KABOOM!")).doNothing().when(producer).commitTransaction(allOffsets, null);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
task00.setCommittableOffsetsAndMetadata(offsetsT00);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
task00.setCommitNeeded();
task01.setCommitNeeded();
final TaskCorruptedException exception = assertThrows(
TaskCorruptedException.class,
() -> taskManager.commit(mkSet(task00, task01, task02))
);
assertThat(
exception.corruptedTasks(),
equalTo(mkSet(taskId00, taskId01))
);
verify(consumer).groupMetadata();
}
@Test
public void shouldStreamsExceptionOnCommitError() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task01.setCommittableOffsetsAndMetadata(offsets);
task01.setCommitNeeded();
taskManager.addTask(task01);
doThrow(new KafkaException()).when(consumer).commitSync(offsets);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.commitAll()
);
assertThat(thrown.getCause(), instanceOf(KafkaException.class));
assertThat(thrown.getMessage(), equalTo("Error encountered committing offsets via consumer"));
assertThat(task01.state(), is(Task.State.CREATED));
}
@Test
public void shouldFailOnCommitFatal() {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task01.setCommittableOffsetsAndMetadata(offsets);
task01.setCommitNeeded();
taskManager.addTask(task01);
doThrow(new RuntimeException("KABOOM")).when(consumer).commitSync(offsets);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.commitAll()
);
assertThat(thrown.getMessage(), equalTo("KABOOM"));
assertThat(task01.state(), is(Task.State.CREATED));
}
@Test
public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
throw new RuntimeException("KABOOM!");
}
};
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
taskManager.handleAssignment(assignment, Collections.emptyMap());
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.handleRevocation(asList(t1p0, t1p1)));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
assertThat(task00.state(), is(Task.State.SUSPENDED));
assertThat(task01.state(), is(Task.State.SUSPENDED));
verifyNoInteractions(consumer);
}
@Test
public void shouldConvertActiveTaskToStandbyTask() {
final StreamTask activeTask = mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(activeTask.isActive()).thenReturn(true);
final StandbyTask standbyTask = mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
when(standbyTaskCreator.createStandbyTaskFromActive(any(), eq(taskId00Partitions))).thenReturn(standbyTask);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
verifyNoInteractions(consumer);
}
@Test
public void shouldConvertStandbyTaskToActiveTask() {
final StandbyTask standbyTask = mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);
when(standbyTask.isActive()).thenReturn(false);
when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap());
final StreamTask activeTask = mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask));
when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), any()))
.thenReturn(activeTask);
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap()));
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoInteractions(consumer);
}
@Test
public void shouldListNotPausedTasks() {
handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
assertEquals(taskManager.notPausedTasks().size(), 2);
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
assertEquals(taskManager.notPausedTasks().size(), 0);
}
private static KafkaFutureImpl<DeletedRecords> completedFuture() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
futureDeletedRecords.complete(null);
return futureDeletedRecords;
}
private void makeTaskFolders(final String... names) throws Exception {
final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(names.length);
for (int i = 0; i < names.length; ++i) {
taskFolders.add(new TaskDirectory(testFolder.newFolder(names[i]), null));
}
when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders);
}
private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Long> offsets) throws Exception {
final File checkpointFile = getCheckpointFile(task);
Files.createFile(checkpointFile.toPath());
new OffsetCheckpoint(checkpointFile).write(offsets);
lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);
expectDirectoryNotEmpty(task);
}
private File getCheckpointFile(final TaskId task) {
return new File(new File(testFolder.getRoot(), task.toString()), StateManagerUtil.CHECKPOINT_FILE_NAME);
}
private static ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition topicPartition, final long offset) {
return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, null, null);
}
private static class StateMachineTask extends AbstractTask implements Task {
private final boolean active;
// TODO: KAFKA-12569 clean up usage of these flags and use the new commitCompleted flag where appropriate
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean commitPrepared = false;
private boolean commitCompleted = false;
private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
private Map<TopicPartition, Long> purgeableOffsets;
private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
private Set<TopicPartition> partitionsForOffsetReset = Collections.emptySet();
private Long timeout = null;
private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<>();
StateMachineTask(final TaskId id,
final Set<TopicPartition> partitions,
final boolean active,
final ProcessorStateManager processorStateManager) {
super(id, null, null, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class);
this.active = active;
}
@Override
public void initializeIfNeeded() {
if (state() == State.CREATED) {
transitionTo(State.RESTORING);
if (!active) {
transitionTo(State.RUNNING);
}
}
}
@Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
this.partitionsForOffsetReset = partitionsForOffsetReset;
}
@Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
if (state() == State.RUNNING) {
return;
}
transitionTo(State.RUNNING);
}
public void setCommitNeeded() {
commitNeeded = true;
}
@Override
public boolean commitNeeded() {
return commitNeeded;
}
public void setCommitRequested() {
commitRequested = true;
}
@Override
public boolean commitRequested() {
return commitRequested;
}
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
commitPrepared = true;
if (commitNeeded) {
return committableOffsets;
} else {
return Collections.emptyMap();
}
}
@Override
public void postCommit(final boolean enforceCheckpoint) {
commitNeeded = false;
commitCompleted = true;
}
@Override
public void suspend() {
if (state() == State.CLOSED) {
throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
} else if (state() == State.SUSPENDED) {
// do nothing
} else {
transitionTo(State.SUSPENDED);
}
}
@Override
public void resume() {
if (state() == State.SUSPENDED) {
transitionTo(State.RUNNING);
}
}
@Override
public void revive() {
//TODO: KAFKA-12569 move clearing of commit-required statuses to closeDirty/Clean/AndRecycle methods
commitNeeded = false;
commitRequested = false;
super.revive();
}
@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final Exception cause) {
timeout = currentWallClockMs;
}
@Override
public void clearTaskTimeout() {
timeout = null;
}
@Override
public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {
// do nothing
}
@Override
public void closeClean() {
transitionTo(State.CLOSED);
}
@Override
public void closeDirty() {
transitionTo(State.CLOSED);
}
@Override
public void prepareRecycle() {
transitionTo(State.CLOSED);
}
@Override
public void resumePollingForPartitionsWithAvailableSpace() {
// noop
}
@Override
public void updateLags() {
// noop
}
@Override
public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
inputPartitions = topicPartitions;
}
void setCommittableOffsetsAndMetadata(final Map<TopicPartition, OffsetAndMetadata> committableOffsets) {
if (!active) {
throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
}
this.committableOffsets = committableOffsets;
}
@Override
public StateStore getStore(final String name) {
return null;
}
@Override
public Set<TopicPartition> changelogPartitions() {
return changelogOffsets.keySet();
}
public boolean isActive() {
return active;
}
void setPurgeableOffsets(final Map<TopicPartition, Long> purgeableOffsets) {
this.purgeableOffsets = purgeableOffsets;
}
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
return purgeableOffsets;
}
void setChangelogOffsets(final Map<TopicPartition, Long> changelogOffsets) {
this.changelogOffsets = changelogOffsets;
}
@Override
public Map<TopicPartition, Long> changelogOffsets() {
return changelogOffsets;
}
@Override
public Map<TopicPartition, Long> committedOffsets() {
return Collections.emptyMap();
}
@Override
public Map<TopicPartition, Long> highWaterMark() {
return Collections.emptyMap();
}
@Override
public Optional<Long> timeCurrentIdlingStarted() {
return Optional.empty();
}
@Override
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
if (isActive()) {
final Deque<ConsumerRecord<byte[], byte[]>> partitionQueue =
queue.computeIfAbsent(partition, k -> new LinkedList<>());
for (final ConsumerRecord<byte[], byte[]> record : records) {
partitionQueue.add(record);
}
} else {
throw new IllegalStateException("Can't add records to an inactive task.");
}
}
@Override
public boolean process(final long wallClockTime) {
if (isActive() && state() == State.RUNNING) {
for (final LinkedList<ConsumerRecord<byte[], byte[]>> records : queue.values()) {
final ConsumerRecord<byte[], byte[]> record = records.poll();
if (record != null) {
return true;
}
}
return false;
} else {
throw new IllegalStateException("Can't process an inactive or non-running task.");
}
}
}
}