| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.processor.internals; |
| |
| import org.apache.kafka.clients.admin.Admin; |
| import org.apache.kafka.clients.admin.AdminClient; |
| import org.apache.kafka.clients.admin.ListOffsetsResult; |
| import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; |
| import org.apache.kafka.clients.admin.OffsetSpec; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; |
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; |
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; |
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.common.Cluster; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.Node; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.config.ConfigException; |
| import org.apache.kafka.common.errors.TimeoutException; |
| import org.apache.kafka.common.internals.KafkaFutureImpl; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.utils.MockTime; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsBuilder; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.StreamsConfig.InternalConfig; |
| import org.apache.kafka.streams.TopologyWrapper; |
| import org.apache.kafka.streams.kstream.Grouped; |
| import org.apache.kafka.streams.kstream.JoinWindows; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.Materialized; |
| import org.apache.kafka.streams.kstream.TimeWindows; |
| import org.apache.kafka.streams.kstream.ValueJoiner; |
| import org.apache.kafka.streams.kstream.internals.ConsumedInternal; |
| import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; |
| import org.apache.kafka.streams.kstream.internals.MaterializedInternal; |
| import org.apache.kafka.streams.processor.TaskId; |
| import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; |
| import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; |
| import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; |
| import org.apache.kafka.streams.processor.internals.assignment.AssignorError; |
| import org.apache.kafka.streams.processor.internals.assignment.ClientState; |
| import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor; |
| import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; |
| import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; |
| import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; |
| import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; |
| import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; |
| import org.apache.kafka.streams.state.HostInfo; |
| import org.apache.kafka.test.MockApiProcessorSupplier; |
| import org.apache.kafka.test.MockClientSupplier; |
| import org.apache.kafka.test.MockInternalTopicManager; |
| import org.apache.kafka.test.MockKeyValueStoreBuilder; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnit; |
| import org.mockito.junit.MockitoRule; |
| import org.mockito.quality.Strictness; |
| |
| import java.nio.ByteBuffer; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| import static java.time.Duration.ofMillis; |
| import static java.util.Arrays.asList; |
| import static java.util.Collections.emptyList; |
| import static java.util.Collections.emptyMap; |
| import static java.util.Collections.emptySet; |
| import static java.util.Collections.singleton; |
| import static java.util.Collections.singletonList; |
| import static java.util.Collections.singletonMap; |
| import static org.apache.kafka.common.utils.Utils.mkEntry; |
| import static org.apache.kafka.common.utils.Utils.mkMap; |
| import static org.apache.kafka.common.utils.Utils.mkSet; |
| import static org.apache.kafka.common.utils.Utils.mkSortedSet; |
| import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_3; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_1; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo; |
| import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.anEmptyMap; |
| import static org.hamcrest.Matchers.empty; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertThrows; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.lenient; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| @RunWith(Parameterized.class) |
| @SuppressWarnings("deprecation") |
| public class StreamsPartitionAssignorTest { |
| // We need this rule because we would like to combine Parameterised tests with strict Mockito stubs. |
| @Rule |
| public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); |
| |
| private static final String CONSUMER_1 = "consumer1"; |
| private static final String CONSUMER_2 = "consumer2"; |
| private static final String CONSUMER_3 = "consumer3"; |
| private static final String CONSUMER_4 = "consumer4"; |
| |
| private final Set<String> allTopics = mkSet("topic1", "topic2"); |
| |
| private final TopicPartition t1p0 = new TopicPartition("topic1", 0); |
| private final TopicPartition t1p1 = new TopicPartition("topic1", 1); |
| private final TopicPartition t1p2 = new TopicPartition("topic1", 2); |
| private final TopicPartition t1p3 = new TopicPartition("topic1", 3); |
| private final TopicPartition t2p0 = new TopicPartition("topic2", 0); |
| private final TopicPartition t2p1 = new TopicPartition("topic2", 1); |
| private final TopicPartition t2p2 = new TopicPartition("topic2", 2); |
| private final TopicPartition t2p3 = new TopicPartition("topic2", 3); |
| private final TopicPartition t3p0 = new TopicPartition("topic3", 0); |
| private final TopicPartition t3p1 = new TopicPartition("topic3", 1); |
| private final TopicPartition t3p2 = new TopicPartition("topic3", 2); |
| private final TopicPartition t3p3 = new TopicPartition("topic3", 3); |
| |
| private final List<PartitionInfo> infos = asList( |
| new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) |
| ); |
| |
| private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); |
| |
| private final Cluster metadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| infos, |
| emptySet(), |
| emptySet() |
| ); |
| |
| private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); |
| private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); |
| private static final String USER_END_POINT = "localhost:8080"; |
| private static final String OTHER_END_POINT = "other:9090"; |
| private static final String APPLICATION_ID = "stream-partition-assignor-test"; |
| |
| private TaskManager taskManager; |
| private Admin adminClient; |
| private InternalTopologyBuilder builder = new InternalTopologyBuilder(); |
| private TopologyMetadata topologyMetadata; |
| @Mock |
| private StreamsMetadataState streamsMetadataState; |
| @Captor |
| private ArgumentCaptor<Map<TopicPartition, PartitionInfo>> topicPartitionInfoCaptor; |
| private final Map<String, Subscription> subscriptions = new HashMap<>(); |
| private final Class<? extends TaskAssignor> taskAssignor; |
| private Map<String, String> clientTags; |
| |
| private final ReferenceContainer referenceContainer = new ReferenceContainer(); |
| private final MockTime time = new MockTime(); |
| private final byte uniqueField = 1; |
| |
| @SuppressWarnings("unchecked") |
| private Map<String, Object> configProps() { |
| final Map<String, Object> configurationMap = new HashMap<>(); |
| configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); |
| configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT); |
| referenceContainer.mainConsumer = mock(Consumer.class); |
| referenceContainer.adminClient = adminClient != null ? adminClient : mock(Admin.class); |
| referenceContainer.taskManager = taskManager; |
| referenceContainer.streamsMetadataState = streamsMetadataState; |
| referenceContainer.time = time; |
| referenceContainer.clientTags = clientTags != null ? clientTags : EMPTY_CLIENT_TAGS; |
| configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); |
| configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName()); |
| return configurationMap; |
| } |
| |
| private MockInternalTopicManager configureDefault() { |
| createDefaultMockTaskManager(); |
| return configureDefaultPartitionAssignor(); |
| } |
| |
| // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor |
| private MockInternalTopicManager configureDefaultPartitionAssignor() { |
| return configurePartitionAssignorWith(emptyMap()); |
| } |
| |
| // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor |
| private MockInternalTopicManager configurePartitionAssignorWith(final Map<String, Object> props) { |
| final Map<String, Object> configMap = configProps(); |
| configMap.putAll(props); |
| |
| partitionAssignor.configure(configMap); |
| |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); |
| return overwriteInternalTopicManagerWithMock(false); |
| } |
| |
| private void createDefaultMockTaskManager() { |
| createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); |
| } |
| |
| private void createMockTaskManager(final Set<TaskId> activeTasks, |
| final Set<TaskId> standbyTasks) { |
| taskManager = mock(TaskManager.class); |
| lenient().when(taskManager.topologyMetadata()).thenReturn(topologyMetadata); |
| lenient().when(taskManager.getTaskOffsetSums()).thenReturn(getTaskOffsetSums(activeTasks, standbyTasks)); |
| lenient().when(taskManager.processId()).thenReturn(UUID_1); |
| builder.setApplicationId(APPLICATION_ID); |
| topologyMetadata.buildAndRewriteTopology(); |
| } |
| |
| // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal |
| // topics and we will skip the listOffsets request for these changelogs |
| private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { |
| final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( |
| time, |
| new StreamsConfig(configProps()), |
| mockClientSupplier.restoreConsumer, |
| mockCreateInternalTopics |
| ); |
| partitionAssignor.setInternalTopicManager(mockInternalTopicManager); |
| return mockInternalTopicManager; |
| } |
| |
| @Parameterized.Parameters(name = "task assignor = {0}") |
| public static Collection<Object[]> parameters() { |
| return asList( |
| new Object[]{HighAvailabilityTaskAssignor.class}, |
| new Object[]{StickyTaskAssignor.class}, |
| new Object[]{FallbackPriorTaskAssignor.class} |
| ); |
| } |
| |
| public StreamsPartitionAssignorTest(final Class<? extends TaskAssignor> taskAssignor) { |
| this.taskAssignor = taskAssignor; |
| adminClient = createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS); |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); |
| } |
| |
| @Test |
| public void shouldUseEagerRebalancingProtocol() { |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23)); |
| |
| assertEquals(1, partitionAssignor.supportedProtocols().size()); |
| assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER)); |
| assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); |
| } |
| |
| @Test |
| public void shouldUseCooperativeRebalancingProtocol() { |
| configureDefault(); |
| |
| assertEquals(2, partitionAssignor.supportedProtocols().size()); |
| assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); |
| } |
| |
| @Test |
| public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() { |
| final List<TaskId> allTasks = |
| asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3); |
| |
| final Map<String, List<TaskId>> previousAssignment = mkMap( |
| mkEntry(CONSUMER_1, asList(TASK_0_0, TASK_1_1, TASK_1_3)), |
| mkEntry(CONSUMER_2, asList(TASK_0_3, TASK_1_0)), |
| mkEntry(CONSUMER_3, asList(TASK_0_1, TASK_0_2, TASK_1_2)) |
| ); |
| |
| final ClientState state = new ClientState(); |
| final SortedSet<String> consumers = mkSortedSet(CONSUMER_1, CONSUMER_2, CONSUMER_3); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS)); |
| state.initializePrevTasks(emptyMap(), false); |
| state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); |
| |
| assertEquivalentAssignment( |
| previousAssignment, |
| assignTasksToThreads( |
| allTasks, |
| true, |
| consumers, |
| state, |
| new HashMap<>() |
| ) |
| ); |
| } |
| |
| @Test |
| public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() { |
| final List<TaskId> allTasks = |
| new ArrayList<>(asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3)); |
| |
| final Map<String, List<TaskId>> previousAssignment = mkMap( |
| mkEntry(CONSUMER_1, new ArrayList<>(asList(TASK_0_0, TASK_1_1, TASK_1_3))), |
| mkEntry(CONSUMER_2, new ArrayList<>(asList(TASK_0_3, TASK_1_0))), |
| mkEntry(CONSUMER_3, new ArrayList<>(asList(TASK_0_1, TASK_0_2, TASK_1_2))) |
| ); |
| |
| final ClientState state = new ClientState(); |
| final SortedSet<String> consumers = mkSortedSet(CONSUMER_1, CONSUMER_2, CONSUMER_3); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS)); |
| state.initializePrevTasks(emptyMap(), false); |
| state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); |
| |
| // We should be able to add a new task without sacrificing stickiness |
| final TaskId newTask = TASK_2_0; |
| allTasks.add(newTask); |
| state.assignActiveTasks(allTasks); |
| |
| final Map<String, List<TaskId>> newAssignment = |
| assignTasksToThreads( |
| allTasks, |
| true, |
| consumers, |
| state, |
| new HashMap<>() |
| ); |
| |
| previousAssignment.get(CONSUMER_2).add(newTask); |
| assertEquivalentAssignment(previousAssignment, newAssignment); |
| } |
| |
| @Test |
| public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() { |
| final List<TaskId> allTasks = |
| asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3); |
| |
| final Map<String, List<TaskId>> previousAssignment = mkMap( |
| mkEntry(CONSUMER_1, asList(TASK_0_0, TASK_1_1, TASK_1_3)), |
| mkEntry(CONSUMER_2, asList(TASK_0_3, TASK_1_0)), |
| mkEntry(CONSUMER_3, asList(TASK_0_1, TASK_0_2, TASK_1_2)) |
| ); |
| |
| final ClientState state = new ClientState(); |
| final SortedSet<String> consumers = mkSortedSet(CONSUMER_1, CONSUMER_2, CONSUMER_3); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS)); |
| state.initializePrevTasks(emptyMap(), false); |
| state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); |
| |
| // Consumer 3 leaves the group |
| consumers.remove(CONSUMER_3); |
| |
| final Map<String, List<TaskId>> assignment = assignTasksToThreads( |
| allTasks, |
| true, |
| consumers, |
| state, |
| new HashMap<>() |
| ); |
| |
| // Each member should have all of its previous tasks reassigned plus some of consumer 3's tasks |
| // We should give one of its tasks to consumer 1, and two of its tasks to consumer 2 |
| assertTrue(assignment.get(CONSUMER_1).containsAll(previousAssignment.get(CONSUMER_1))); |
| assertTrue(assignment.get(CONSUMER_2).containsAll(previousAssignment.get(CONSUMER_2))); |
| |
| assertThat(assignment.get(CONSUMER_1).size(), equalTo(4)); |
| assertThat(assignment.get(CONSUMER_2).size(), equalTo(4)); |
| } |
| |
| @Test |
| public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() { |
| final List<TaskId> allTasks = |
| asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3); |
| |
| final Map<String, List<TaskId>> previousAssignment = mkMap( |
| mkEntry(CONSUMER_1, asList(TASK_0_0, TASK_1_1, TASK_1_3)), |
| mkEntry(CONSUMER_2, asList(TASK_0_3, TASK_1_0)), |
| mkEntry(CONSUMER_3, asList(TASK_0_1, TASK_0_2, TASK_1_2)) |
| ); |
| |
| final ClientState state = new ClientState(); |
| final SortedSet<String> consumers = mkSortedSet(CONSUMER_1, CONSUMER_2, CONSUMER_3); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS)); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS)); |
| |
| // Consumer 4 joins the group |
| consumers.add(CONSUMER_4); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_4, getTaskOffsetSums(EMPTY_TASKS, EMPTY_TASKS)); |
| |
| state.initializePrevTasks(emptyMap(), false); |
| state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); |
| |
| final Map<String, List<TaskId>> assignment = assignTasksToThreads( |
| allTasks, |
| true, |
| consumers, |
| state, |
| new HashMap<>() |
| ); |
| |
| // we should move one task each from consumer 1 and consumer 3 to the new member, and none from consumer 2 |
| assertTrue(previousAssignment.get(CONSUMER_1).containsAll(assignment.get(CONSUMER_1))); |
| assertTrue(previousAssignment.get(CONSUMER_3).containsAll(assignment.get(CONSUMER_3))); |
| |
| assertTrue(assignment.get(CONSUMER_2).containsAll(previousAssignment.get(CONSUMER_2))); |
| |
| |
| assertThat(assignment.get(CONSUMER_1).size(), equalTo(2)); |
| assertThat(assignment.get(CONSUMER_2).size(), equalTo(2)); |
| assertThat(assignment.get(CONSUMER_3).size(), equalTo(2)); |
| assertThat(assignment.get(CONSUMER_4).size(), equalTo(2)); |
| } |
| |
| @Test |
| public void shouldInterleaveTasksByGroupIdDuringNewAssignment() { |
| final List<TaskId> allTasks = |
| asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1); |
| |
| final Map<String, List<TaskId>> assignment = mkMap( |
| mkEntry(CONSUMER_1, new ArrayList<>(asList(TASK_0_0, TASK_0_3, TASK_1_2))), |
| mkEntry(CONSUMER_2, new ArrayList<>(asList(TASK_0_1, TASK_1_0, TASK_2_0))), |
| mkEntry(CONSUMER_3, new ArrayList<>(asList(TASK_0_2, TASK_1_1, TASK_2_1))) |
| ); |
| |
| final ClientState state = new ClientState(); |
| final SortedSet<String> consumers = mkSortedSet(CONSUMER_1, CONSUMER_2, CONSUMER_3); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_1, emptyMap()); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_2, emptyMap()); |
| state.addPreviousTasksAndOffsetSums(CONSUMER_3, emptyMap()); |
| |
| Collections.shuffle(allTasks); |
| |
| final Map<String, List<TaskId>> interleavedTaskIds = |
| assignTasksToThreads( |
| allTasks, |
| true, |
| consumers, |
| state, |
| new HashMap<>() |
| ); |
| |
| assertThat(interleavedTaskIds, equalTo(assignment)); |
| } |
| |
| @Test |
| public void testEagerSubscription() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); |
| |
| final Set<TaskId> prevTasks = mkSet( |
| new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1) |
| ); |
| final Set<TaskId> standbyTasks = mkSet( |
| new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2) |
| ); |
| |
| createMockTaskManager(prevTasks, standbyTasks); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23)); |
| assertThat(partitionAssignor.rebalanceProtocol(), equalTo(RebalanceProtocol.EAGER)); |
| |
| final Set<String> topics = mkSet("topic1", "topic2"); |
| final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); |
| |
| Collections.sort(subscription.topics()); |
| assertEquals(asList("topic1", "topic2"), subscription.topics()); |
| |
| final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField); |
| assertEquals(info, SubscriptionInfo.decode(subscription.userData())); |
| } |
| |
| @Test |
| public void testCooperativeSubscription() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); |
| |
| final Set<TaskId> prevTasks = mkSet( |
| new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); |
| final Set<TaskId> standbyTasks = mkSet( |
| new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), |
| new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); |
| |
| createMockTaskManager(prevTasks, standbyTasks); |
| configureDefaultPartitionAssignor(); |
| |
| final Set<String> topics = mkSet("topic1", "topic2"); |
| final Subscription subscription = new Subscription( |
| new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); |
| |
| Collections.sort(subscription.topics()); |
| assertEquals(asList("topic1", "topic2"), subscription.topics()); |
| |
| final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField); |
| assertEquals(info, SubscriptionInfo.decode(subscription.userData())); |
| } |
| |
| @Test |
| public void testAssignBasic() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| final Set<TaskId> prevTasks10 = mkSet(TASK_0_0); |
| final Set<TaskId> prevTasks11 = mkSet(TASK_0_1); |
| final Set<TaskId> prevTasks20 = mkSet(TASK_0_2); |
| final Set<TaskId> standbyTasks10 = EMPTY_TASKS; |
| final Set<TaskId> standbyTasks11 = mkSet(TASK_0_2); |
| final Set<TaskId> standbyTasks20 = mkSet(TASK_0_0); |
| |
| createMockTaskManager(prevTasks10, standbyTasks10); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store-changelog"), |
| singletonList(3)) |
| ); |
| configureDefaultPartitionAssignor(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks10, standbyTasks10).encode() |
| )); |
| subscriptions.put("consumer11", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks11, standbyTasks11).encode() |
| )); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, prevTasks20, standbyTasks20).encode() |
| )); |
| |
| final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| |
| // check the assignment |
| assertEquals(mkSet(mkSet(t1p0, t2p0), mkSet(t1p1, t2p1)), |
| mkSet(new HashSet<>(assignments.get("consumer10").partitions()), |
| new HashSet<>(assignments.get("consumer11").partitions()))); |
| assertEquals(mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); |
| |
| // check assignment info |
| |
| // the first consumer |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); |
| allActiveTasks.addAll(info11.activeTasks()); |
| |
| assertEquals(mkSet(TASK_0_0, TASK_0_1), allActiveTasks); |
| |
| // the third consumer |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| allActiveTasks.addAll(info20.activeTasks()); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, new HashSet<>(allActiveTasks)); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| } |
| |
| @Test |
| public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addProcessor("processorII", new MockApiProcessorSupplier<>(), "source2"); |
| |
| final List<PartitionInfo> localInfos = asList( |
| new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]) |
| ); |
| |
| final Cluster localMetadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| localInfos, |
| emptySet(), |
| emptySet()); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| defaultSubscriptionInfo.encode() |
| )); |
| subscriptions.put("consumer11", |
| new Subscription( |
| topics, |
| defaultSubscriptionInfo.encode() |
| )); |
| |
| final Map<String, Assignment> assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // check assigned partitions |
| assertEquals(mkSet(mkSet(t2p2, t1p0, t1p2, t2p0), mkSet(t1p1, t2p1, t1p3, t2p3)), |
| mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); |
| |
| // the first consumer |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| |
| final List<TaskId> expectedInfo10TaskIds = asList(TASK_0_0, TASK_0_2, TASK_1_0, TASK_1_2); |
| assertEquals(expectedInfo10TaskIds, info10.activeTasks()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final List<TaskId> expectedInfo11TaskIds = asList(TASK_0_1, TASK_0_3, TASK_1_1, TASK_1_3); |
| |
| assertEquals(expectedInfo11TaskIds, info11.activeTasks()); |
| } |
| |
| @Test |
| public void testAssignEmptyMetadata() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| final Set<TaskId> prevTasks10 = mkSet(TASK_0_0); |
| final Set<TaskId> standbyTasks10 = mkSet(TASK_0_1); |
| final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), |
| emptySet(), |
| emptySet(), |
| emptySet()); |
| |
| createMockTaskManager(prevTasks10, standbyTasks10); |
| configureDefaultPartitionAssignor(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks10, standbyTasks10).encode() |
| )); |
| |
| // initially metadata is empty |
| Map<String, Assignment> assignments = |
| partitionAssignor.assign(emptyMetadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // check assigned partitions |
| assertEquals(emptySet(), |
| new HashSet<>(assignments.get("consumer10").partitions())); |
| |
| // check assignment info |
| AssignmentInfo info10 = checkAssignment(emptySet(), assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| |
| assertEquals(0, allActiveTasks.size()); |
| |
| // then metadata gets populated |
| assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| // check assigned partitions |
| assertEquals(mkSet(mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)), |
| mkSet(new HashSet<>(assignments.get("consumer10").partitions()))); |
| |
| // the first consumer |
| info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| allActiveTasks.addAll(info10.activeTasks()); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, new HashSet<>(allActiveTasks)); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| } |
| |
| @Test |
| public void testAssignWithNewTasks() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addSource(null, "source3", null, null, null, "topic3"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2", "source3"); |
| final List<String> topics = asList("topic1", "topic2", "topic3"); |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3); |
| |
| // assuming that previous tasks do not have topic3 |
| final Set<TaskId> prevTasks10 = mkSet(TASK_0_0); |
| final Set<TaskId> prevTasks11 = mkSet(TASK_0_1); |
| final Set<TaskId> prevTasks20 = mkSet(TASK_0_2); |
| |
| createMockTaskManager(prevTasks10, EMPTY_TASKS); |
| configureDefaultPartitionAssignor(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks10, EMPTY_TASKS).encode())); |
| subscriptions.put("consumer11", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks11, EMPTY_TASKS).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, prevTasks20, EMPTY_TASKS).encode())); |
| |
| final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match |
| // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and |
| // then later ones will be re-assigned to other hosts due to load balancing |
| AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks()); |
| final Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions()); |
| |
| info = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| allActiveTasks.addAll(info.activeTasks()); |
| allPartitions.addAll(assignments.get("consumer11").partitions()); |
| |
| info = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| allActiveTasks.addAll(info.activeTasks()); |
| allPartitions.addAll(assignments.get("consumer20").partitions()); |
| |
| assertEquals(allTasks, allActiveTasks); |
| assertEquals(mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); |
| } |
| |
| @Test |
| public void testAssignWithStates() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| |
| builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor-1"); |
| |
| builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor-2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), "processor-2"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| final List<TaskId> tasks = asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2); |
| |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| asList(APPLICATION_ID + "-store1-changelog", |
| APPLICATION_ID + "-store2-changelog", |
| APPLICATION_ID + "-store3-changelog"), |
| asList(3, 3, 3)) |
| ); |
| configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription(topics, defaultSubscriptionInfo.encode())); |
| subscriptions.put("consumer11", |
| new Subscription(topics, defaultSubscriptionInfo.encode())); |
| subscriptions.put("consumer20", |
| new Subscription(topics, getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode())); |
| |
| final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match |
| assertEquals(2, assignments.get("consumer10").partitions().size()); |
| assertEquals(2, assignments.get("consumer11").partitions().size()); |
| assertEquals(2, assignments.get("consumer20").partitions().size()); |
| |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| |
| assertEquals(2, info10.activeTasks().size()); |
| assertEquals(2, info11.activeTasks().size()); |
| assertEquals(2, info20.activeTasks().size()); |
| |
| final Set<TaskId> allTasks = new HashSet<>(); |
| allTasks.addAll(info10.activeTasks()); |
| allTasks.addAll(info11.activeTasks()); |
| allTasks.addAll(info20.activeTasks()); |
| assertEquals(new HashSet<>(tasks), allTasks); |
| |
| // check tasks for state topics |
| final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo(); |
| |
| assertEquals(mkSet(TASK_0_0, TASK_0_1, TASK_0_2), tasksForState("store1", tasks, topicGroups)); |
| assertEquals(mkSet(TASK_1_0, TASK_1_1, TASK_1_2), tasksForState("store2", tasks, topicGroups)); |
| assertEquals(mkSet(TASK_1_0, TASK_1_1, TASK_1_2), tasksForState("store3", tasks, topicGroups)); |
| } |
| |
| private static Set<TaskId> tasksForState(final String storeName, |
| final List<TaskId> tasks, |
| final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups) { |
| final String changelogTopic = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, storeName, null); |
| |
| final Set<TaskId> ids = new HashSet<>(); |
| for (final Map.Entry<Subtopology, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { |
| final Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet(); |
| |
| if (stateChangelogTopics.contains(changelogTopic)) { |
| for (final TaskId id : tasks) { |
| if (id.subtopology() == entry.getKey().nodeGroupId) { |
| ids.add(id); |
| } |
| } |
| } |
| } |
| return ids; |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicasAndStatelessTasks() { |
| builder.addSource(null, "source1", null, null, null, "topic1", "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| createMockTaskManager(mkSet(TASK_0_0), emptySet()); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode())); |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| assertTrue(info10.standbyTasks().isEmpty()); |
| |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| assertTrue(info20.standbyTasks().isEmpty()); |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicasAndLoggingDisabled() { |
| builder.addSource(null, "source1", null, null, null, "topic1", "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false).withLoggingDisabled(), "processor"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| createMockTaskManager(mkSet(TASK_0_0), emptySet()); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode())); |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| assertTrue(info10.standbyTasks().isEmpty()); |
| |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| assertTrue(info20.standbyTasks().isEmpty()); |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicas() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TopicPartition> allTopicPartitions = topics.stream() |
| .map(topic -> asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))) |
| .flatMap(Collection::stream) |
| .collect(Collectors.toSet()); |
| |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| final Set<TaskId> prevTasks00 = mkSet(TASK_0_0); |
| final Set<TaskId> prevTasks01 = mkSet(TASK_0_1); |
| final Set<TaskId> prevTasks02 = mkSet(TASK_0_2); |
| final Set<TaskId> standbyTasks00 = mkSet(TASK_0_0); |
| final Set<TaskId> standbyTasks01 = mkSet(TASK_0_1); |
| final Set<TaskId> standbyTasks02 = mkSet(TASK_0_2); |
| |
| createMockTaskManager(prevTasks00, standbyTasks01); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(3)) |
| ); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks00, EMPTY_TASKS, USER_END_POINT).encode())); |
| subscriptions.put("consumer11", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, prevTasks01, standbyTasks02, USER_END_POINT).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode())); |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // the first consumer |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| final Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); |
| allActiveTasks.addAll(info11.activeTasks()); |
| allStandbyTasks.addAll(info11.standbyTasks().keySet()); |
| |
| assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet()); |
| |
| // check active tasks assigned to the first client |
| assertEquals(mkSet(TASK_0_0, TASK_0_1), new HashSet<>(allActiveTasks)); |
| assertEquals(mkSet(TASK_0_2), new HashSet<>(allStandbyTasks)); |
| |
| // the third consumer |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| allActiveTasks.addAll(info20.activeTasks()); |
| allStandbyTasks.addAll(info20.standbyTasks().keySet()); |
| |
| // all task ids are in the active tasks and also in the standby tasks |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| |
| assertEquals(3, allStandbyTasks.size()); |
| assertEquals(allTasks, allStandbyTasks); |
| |
| // Check host partition assignments |
| final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info10.partitionsByHost(); |
| assertEquals(2, partitionsByHost.size()); |
| assertEquals(allTopicPartitions, partitionsByHost.values().stream() |
| .flatMap(Collection::stream).collect(Collectors.toSet())); |
| |
| final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = info10.standbyPartitionByHost(); |
| assertEquals(2, standbyPartitionsByHost.size()); |
| assertEquals(allTopicPartitions, standbyPartitionsByHost.values().stream() |
| .flatMap(Collection::stream).collect(Collectors.toSet())); |
| |
| for (final HostInfo hostInfo : partitionsByHost.keySet()) { |
| assertTrue(Collections.disjoint(partitionsByHost.get(hostInfo), standbyPartitionsByHost.get(hostInfo))); |
| } |
| |
| // All consumers got the same host info |
| assertEquals(partitionsByHost, info11.partitionsByHost()); |
| assertEquals(partitionsByHost, info20.partitionsByHost()); |
| assertEquals(standbyPartitionsByHost, info11.standbyPartitionByHost()); |
| assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicasBalanceSparse() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); |
| |
| final List<String> topics = asList("topic1"); |
| |
| createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(3)) |
| ); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); |
| final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22"); |
| |
| for (final String consumerId : client1Consumers) { |
| subscriptions.put(consumerId, |
| new Subscription( |
| topics, |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| } |
| for (final String consumerId : client2Consumers) { |
| subscriptions.put(consumerId, |
| new Subscription( |
| topics, |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| } |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // Consumers |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData()); |
| final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData()); |
| final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); |
| final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); |
| |
| // Check each consumer has no more than 1 task |
| assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1); |
| assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1); |
| assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1); |
| assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1); |
| assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1); |
| assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1); |
| assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicasBalanceDense() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); |
| |
| final List<String> topics = asList("topic1"); |
| |
| createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(3)) |
| ); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // Consumers |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| |
| // Check each consumer has 3 tasks |
| assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size()); |
| assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size()); |
| // Check that not all the actives are on one node |
| assertTrue(info10.activeTasks().size() < 3); |
| assertTrue(info20.activeTasks().size() < 3); |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state"); |
| |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source2"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(3)) |
| ); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| subscriptions.put("consumer11", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| subscriptions.put("consumer20", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| subscriptions.put("consumer21", |
| new Subscription( |
| topics, |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); |
| |
| final Map<String, Assignment> assignments = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // Consumers |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); |
| |
| // 9 tasks spread over 4 consumers, so we should have no more than 3 tasks per consumer |
| assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 3); |
| assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 3); |
| assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 3); |
| assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 3); |
| // No more than 1 standby per node. |
| assertTrue(info10.standbyTasks().size() <= 1); |
| assertTrue(info11.standbyTasks().size() <= 1); |
| assertTrue(info20.standbyTasks().size() <= 1); |
| assertTrue(info21.standbyTasks().size() <= 1); |
| } |
| |
| @Test |
| public void testOnAssignment() { |
| taskManager = mock(TaskManager.class); |
| |
| final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap( |
| new HostInfo("localhost", 9090), |
| mkSet(t3p0, t3p3)); |
| |
| final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); |
| activeTasks.put(TASK_0_0, mkSet(t3p0)); |
| activeTasks.put(TASK_0_3, mkSet(t3p3)); |
| final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); |
| standbyTasks.put(TASK_0_1, mkSet(t3p1)); |
| standbyTasks.put(TASK_0_2, mkSet(t3p2)); |
| |
| streamsMetadataState = mock(StreamsMetadataState.class); |
| |
| configureDefaultPartitionAssignor(); |
| |
| final List<TaskId> activeTaskList = asList(TASK_0_0, TASK_0_3); |
| final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTaskList, standbyTasks, hostState, emptyMap(), 0); |
| final Assignment assignment = new Assignment(asList(t3p0, t3p3), info.encode()); |
| |
| partitionAssignor.onAssignment(assignment, null); |
| |
| verify(streamsMetadataState).onChange(eq(hostState), any(), topicPartitionInfoCaptor.capture()); |
| verify(taskManager).handleAssignment(activeTasks, standbyTasks); |
| |
| assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p0)); |
| assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p3)); |
| assertEquals(2, topicPartitionInfoCaptor.getValue().size()); |
| } |
| |
| @Test |
| public void testAssignWithInternalTopics() { |
| builder.addInternalTopic("topicX", InternalTopicProperties.empty()); |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addSink("sink1", "topicX", null, null, null, "processor1"); |
| builder.addSource(null, "source2", null, null, null, "topicX"); |
| builder.addProcessor("processor2", new MockApiProcessorSupplier<>(), "source2"); |
| final List<String> topics = asList("topic1", APPLICATION_ID + "-topicX"); |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| final MockInternalTopicManager internalTopicManager = configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| defaultSubscriptionInfo.encode()) |
| ); |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| |
| // check prepared internal topics |
| assertEquals(1, internalTopicManager.readyTopics.size()); |
| assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get(APPLICATION_ID + "-topicX")); |
| } |
| |
| @Test |
| public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { |
| builder.addInternalTopic("topicX", InternalTopicProperties.empty()); |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addSink("sink1", "topicX", null, null, null, "processor1"); |
| builder.addSource(null, "source2", null, null, null, "topicX"); |
| builder.addInternalTopic("topicZ", InternalTopicProperties.empty()); |
| builder.addProcessor("processor2", new MockApiProcessorSupplier<>(), "source2"); |
| builder.addSink("sink2", "topicZ", null, null, null, "processor2"); |
| builder.addSource(null, "source3", null, null, null, "topicZ"); |
| final List<String> topics = asList("topic1", APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ"); |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| final MockInternalTopicManager internalTopicManager = configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| defaultSubscriptionInfo.encode()) |
| ); |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| |
| // check prepared internal topics |
| assertEquals(2, internalTopicManager.readyTopics.size()); |
| assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get(APPLICATION_ID + "-topicZ")); |
| } |
| |
| @Test |
| public void shouldGenerateTasksForAllCreatedPartitions() { |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| |
| // KStream with 3 partitions |
| final KStream<Object, Object> stream1 = streamsBuilder |
| .stream("topic1") |
| // force creation of internal repartition topic |
| .map((KeyValueMapper<Object, Object, KeyValue<Object, Object>>) KeyValue::new); |
| |
| // KTable with 4 partitions |
| final KTable<Object, Long> table1 = streamsBuilder |
| .table("topic3") |
| // force creation of internal repartition topic |
| .groupBy(KeyValue::new) |
| .count(); |
| |
| // joining the stream and the table |
| // this triggers the enforceCopartitioning() routine in the StreamsPartitionAssignor, |
| // forcing the stream.map to get repartitioned to a topic with four partitions. |
| stream1.join( |
| table1, |
| (ValueJoiner<Object, Object, Void>) (value1, value2) -> null); |
| |
| final String client = "client1"; |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); |
| |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| asList(APPLICATION_ID + "-topic3-STATE-STORE-0000000002-changelog", |
| APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"), |
| asList(4, 4)) |
| ); |
| |
| final MockInternalTopicManager mockInternalTopicManager = configureDefault(); |
| |
| subscriptions.put(client, |
| new Subscription( |
| asList("topic1", "topic3"), |
| defaultSubscriptionInfo.encode()) |
| ); |
| final Map<String, Assignment> assignment = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>(); |
| expectedCreatedInternalTopics.put(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); |
| expectedCreatedInternalTopics.put(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4); |
| expectedCreatedInternalTopics.put(APPLICATION_ID + "-topic3-STATE-STORE-0000000002-changelog", 4); |
| expectedCreatedInternalTopics.put(APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition", 4); |
| |
| // check if all internal topics were created as expected |
| assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); |
| |
| final List<TopicPartition> expectedAssignment = asList( |
| new TopicPartition("topic1", 0), |
| new TopicPartition("topic1", 1), |
| new TopicPartition("topic1", 2), |
| new TopicPartition("topic3", 0), |
| new TopicPartition("topic3", 1), |
| new TopicPartition("topic3", 2), |
| new TopicPartition("topic3", 3), |
| new TopicPartition(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), |
| new TopicPartition(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), |
| new TopicPartition(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), |
| new TopicPartition(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), |
| new TopicPartition(APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition", 0), |
| new TopicPartition(APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition", 1), |
| new TopicPartition(APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition", 2), |
| new TopicPartition(APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition", 3) |
| ); |
| |
| // check if we created a task for all expected topicPartitions. |
| assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); |
| } |
| |
| @Test |
| public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut() { |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| streamsBuilder.stream("topic1").repartition(); |
| |
| final String client = "client1"; |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); |
| |
| createDefaultMockTaskManager(); |
| partitionAssignor.configure(configProps()); |
| final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( |
| time, |
| new StreamsConfig(configProps()), |
| mockClientSupplier.restoreConsumer, |
| false |
| ) { |
| @Override |
| public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { |
| throw new TimeoutException("KABOOM!"); |
| } |
| }; |
| partitionAssignor.setInternalTopicManager(mockInternalTopicManager); |
| |
| subscriptions.put(client, |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| ) |
| ); |
| assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); |
| } |
| |
| @Test |
| public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut() { |
| final StreamsConfig config = new StreamsConfig(configProps()); |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| streamsBuilder.table("topic1", Materialized.as("store")); |
| |
| final String client = "client1"; |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); |
| topologyMetadata = new TopologyMetadata(builder, config); |
| |
| createDefaultMockTaskManager(); |
| partitionAssignor.configure(configProps()); |
| final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( |
| time, |
| config, |
| mockClientSupplier.restoreConsumer, |
| false |
| ) { |
| @Override |
| public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { |
| if (topics.isEmpty()) { |
| return emptySet(); |
| } |
| throw new TimeoutException("KABOOM!"); |
| } |
| }; |
| partitionAssignor.setInternalTopicManager(mockInternalTopicManager); |
| |
| subscriptions.put(client, |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| ) |
| ); |
| |
| assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); |
| } |
| |
| @Test |
| public void shouldAddUserDefinedEndPointToSubscription() { |
| builder.addSource(null, "source", null, null, null, "input"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); |
| builder.addSink("sink", "output", null, null, null, "processor"); |
| |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT)); |
| |
| final Set<String> topics = mkSet("input"); |
| final ByteBuffer userData = partitionAssignor.subscriptionUserData(topics); |
| final Subscription subscription = |
| new Subscription(new ArrayList<>(topics), userData); |
| final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); |
| assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); |
| } |
| |
| @Test |
| public void shouldMapUserEndPointToTopicPartitions() { |
| builder.addSource(null, "source", null, null, null, "topic1"); |
| builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); |
| builder.addSink("sink", "output", null, null, null, "processor"); |
| |
| final List<String> topics = Collections.singletonList("topic1"); |
| |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT)); |
| |
| subscriptions.put("consumer1", |
| new Subscription( |
| topics, |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()) |
| ); |
| final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| final Assignment consumerAssignment = assignments.get("consumer1"); |
| final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); |
| final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); |
| assertEquals( |
| mkSet( |
| new TopicPartition("topic1", 0), |
| new TopicPartition("topic1", 1), |
| new TopicPartition("topic1", 2)), |
| topicPartitions); |
| } |
| |
| @Test |
| public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { |
| createDefaultMockTaskManager(); |
| try { |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost")); |
| fail("expected to an exception due to invalid config"); |
| } catch (final ConfigException e) { |
| // pass |
| } |
| } |
| |
| @Test |
| public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { |
| createDefaultMockTaskManager(); |
| assertThrows(ConfigException.class, () -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:j87yhk"))); |
| } |
| |
| @Test |
| public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() { |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| |
| final KStream<Object, Object> stream1 = streamsBuilder |
| |
| // Task 1 (should get created): |
| .stream("topic1") |
| // force repartitioning for aggregation |
| .selectKey((key, value) -> null) |
| .groupByKey() |
| |
| // Task 2 (should get created): |
| // create repartitioning and changelog topic as task 1 exists |
| .count(Materialized.as("count")) |
| |
| // force repartitioning for join, but second join input topic unknown |
| // -> internal repartitioning topic should not get created |
| .toStream() |
| .map((KeyValueMapper<Object, Long, KeyValue<Object, Object>>) (key, value) -> null); |
| |
| streamsBuilder |
| // Task 3 (should not get created because input topic unknown) |
| .stream("unknownTopic") |
| |
| // force repartitioning for join, but input topic unknown |
| // -> thus should not create internal repartitioning topic |
| .selectKey((key, value) -> null) |
| |
| // Task 4 (should not get created because input topics unknown) |
| // should not create any of both input repartition topics or any of both changelog topics |
| .join( |
| stream1, |
| (ValueJoiner<Object, Object, Void>) (value1, value2) -> null, |
| JoinWindows.of(ofMillis(0)) |
| ); |
| |
| final String client = "client1"; |
| |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); |
| |
| final MockInternalTopicManager mockInternalTopicManager = configureDefault(); |
| |
| subscriptions.put(client, |
| new Subscription( |
| Collections.singletonList("unknownTopic"), |
| defaultSubscriptionInfo.encode()) |
| ); |
| final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true)); |
| |
| assertThat(assignment.get(client).partitions().isEmpty(), equalTo(true)); |
| } |
| |
| @Test |
| public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() { |
| final Map<HostInfo, Set<TopicPartition>> initialHostState = mkMap( |
| mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)), |
| mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1)) |
| ); |
| |
| final Map<HostInfo, Set<TopicPartition>> newHostState = mkMap( |
| mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)), |
| mkEntry(new HostInfo("newotherhost", 9090), mkSet(t2p0, t2p1)) |
| ); |
| |
| streamsMetadataState = mock(StreamsMetadataState.class); |
| |
| createDefaultMockTaskManager(); |
| configureDefaultPartitionAssignor(); |
| |
| partitionAssignor.onAssignment(createAssignment(initialHostState), null); |
| partitionAssignor.onAssignment(createAssignment(newHostState), null); |
| |
| verify(streamsMetadataState).onChange(eq(initialHostState), any(), any()); |
| verify(streamsMetadataState).onChange(eq(newHostState), any(), any()); |
| } |
| |
| @Test |
| public void shouldTriggerImmediateRebalanceOnHostInfoChange() { |
| final Map<HostInfo, Set<TopicPartition>> oldHostState = mkMap( |
| mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)), |
| mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1)) |
| ); |
| |
| final Map<HostInfo, Set<TopicPartition>> newHostState = mkMap( |
| mkEntry(new HostInfo("newhost", 9090), mkSet(t1p0, t1p1)), |
| mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1)) |
| ); |
| |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "newhost:9090")); |
| |
| partitionAssignor.onAssignment(createAssignment(oldHostState), null); |
| |
| assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(0L)); |
| |
| partitionAssignor.onAssignment(createAssignment(newHostState), null); |
| |
| assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void shouldTriggerImmediateRebalanceOnTasksRevoked() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| final List<TopicPartition> allPartitions = asList(t1p0, t1p1, t1p2); |
| |
| subscriptions.put(CONSUMER_1, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(), |
| allPartitions) |
| ); |
| subscriptions.put(CONSUMER_2, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_1, EMPTY_TASKS, allTasks).encode(), |
| emptyList()) |
| ); |
| |
| createMockTaskManager(allTasks, allTasks); |
| configurePartitionAssignorWith(singletonMap(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0L)); |
| |
| final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| // Verify at least one partition was revoked |
| assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions)); |
| assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList())); |
| |
| // Verify that stateless revoked tasks would not be assigned as standbys |
| assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).activeTasks(), equalTo(emptyList())); |
| assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).standbyTasks(), equalTo(emptyMap())); |
| |
| partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null); |
| |
| assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(0L)); |
| } |
| |
| @Test |
| public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { |
| final Map<String, Object> props = configProps(); |
| props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); |
| props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT); |
| |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| streamsBuilder.stream("topic1").groupByKey().count(); |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(props)); |
| |
| createDefaultMockTaskManager(); |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"), |
| singletonList(3)) |
| ); |
| |
| configurePartitionAssignorWith(props); |
| |
| subscriptions.put("consumer1", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()) |
| ); |
| subscriptions.put("consumer2", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode()) |
| ); |
| final Set<TopicPartition> allPartitions = mkSet(t1p0, t1p1, t1p2); |
| final Map<String, Assignment> assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| final Assignment consumer1Assignment = assign.get("consumer1"); |
| final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); |
| |
| final Set<TopicPartition> consumer1ActivePartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); |
| final Set<TopicPartition> consumer2ActivePartitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); |
| final Set<TopicPartition> consumer1StandbyPartitions = assignmentInfo.standbyPartitionByHost().get(new HostInfo("localhost", 8080)); |
| final Set<TopicPartition> consumer2StandbyPartitions = assignmentInfo.standbyPartitionByHost().get(new HostInfo("other", 9090)); |
| final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1ActivePartitions); |
| allAssignedPartitions.addAll(consumer2ActivePartitions); |
| assertThat(consumer1ActivePartitions, not(allPartitions)); |
| assertThat(consumer2ActivePartitions, not(allPartitions)); |
| assertThat(consumer1ActivePartitions, equalTo(consumer2StandbyPartitions)); |
| assertThat(consumer2ActivePartitions, equalTo(consumer1StandbyPartitions)); |
| assertThat(allAssignedPartitions, equalTo(allPartitions)); |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionIfReferenceContainerNotConfigured() { |
| final Map<String, Object> config = configProps(); |
| config.remove(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR); |
| |
| final KafkaException expected = assertThrows( |
| KafkaException.class, |
| () -> partitionAssignor.configure(config) |
| ); |
| assertThat(expected.getMessage(), equalTo("ReferenceContainer is not specified")); |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionIfReferenceContainerConfigIsNotTaskManagerInstance() { |
| final Map<String, Object> config = configProps(); |
| config.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, "i am not a reference container"); |
| |
| final KafkaException expected = assertThrows( |
| KafkaException.class, |
| () -> partitionAssignor.configure(config) |
| ); |
| assertThat( |
| expected.getMessage(), |
| equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer") |
| ); |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2); |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3); |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3); |
| } |
| |
| private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, |
| final int otherVersion) { |
| subscriptions.put("consumer1", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfoForOlderVersion(smallestVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode()) |
| ); |
| subscriptions.put("consumer2", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfoForOlderVersion(otherVersion, UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode() |
| ) |
| ); |
| |
| configureDefault(); |
| |
| final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| assertThat(assignment.size(), equalTo(2)); |
| assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); |
| assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion)); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion1() { |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100)); |
| |
| final Set<String> topics = mkSet("topic1"); |
| final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); |
| |
| assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0101() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0102() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0110() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For10() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For11() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11); |
| } |
| |
| private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) { |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); |
| |
| final Set<String> topics = mkSet("topic1"); |
| final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); |
| |
| assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); |
| } |
| |
| @Test |
| public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| subscriptions.put(CONSUMER_1, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(), |
| asList(t1p0, t1p1, t1p2)) |
| ); |
| subscriptions.put(CONSUMER_2, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode(), |
| emptyList()) |
| ); |
| |
| createMockTaskManager(allTasks, allTasks); |
| configureDefaultPartitionAssignor(); |
| |
| final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| assertThat(assignment.size(), equalTo(2)); |
| |
| // The new consumer's assignment should be empty until c1 has the chance to revoke its partitions/tasks |
| assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList())); |
| |
| final AssignmentInfo actualAssignment = AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()); |
| assertThat(actualAssignment.version(), is(LATEST_SUPPORTED_VERSION)); |
| assertThat(actualAssignment.activeTasks(), empty()); |
| // Note we're not asserting anything about standbys. If the assignor gave an active task to CONSUMER_2, it would |
| // be converted to a standby, but we don't know whether the assignor will do that. |
| assertThat(actualAssignment.partitionsByHost(), anEmptyMap()); |
| assertThat(actualAssignment.standbyPartitionByHost(), anEmptyMap()); |
| assertThat(actualAssignment.errCode(), is(0)); |
| } |
| |
| @Test |
| public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| |
| final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); |
| |
| subscriptions.put(CONSUMER_1, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| encodeFutureSubscription(), |
| emptyList()) |
| ); |
| subscriptions.put(CONSUMER_2, |
| new Subscription( |
| Collections.singletonList("topic1"), |
| encodeFutureSubscription(), |
| emptyList()) |
| ); |
| |
| createMockTaskManager(allTasks, allTasks); |
| configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| final Map<String, Assignment> assignment = |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| assertThat(assignment.size(), equalTo(2)); |
| |
| assertThat(assignment.get(CONSUMER_1).partitions(), equalTo(asList(t1p0, t1p2))); |
| assertThat( |
| AssignmentInfo.decode(assignment.get(CONSUMER_1).userData()), |
| equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(TASK_0_0, TASK_0_2), emptyMap(), emptyMap(), emptyMap(), 0))); |
| |
| |
| assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(Collections.singletonList(t1p1))); |
| assertThat( |
| AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()), |
| equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, Collections.singletonList(TASK_0_1), emptyMap(), emptyMap(), emptyMap(), 0))); |
| } |
| |
| @Test |
| public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed() { |
| shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1); |
| } |
| |
| @Test |
| public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed() { |
| shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); |
| } |
| |
| @Test |
| public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() { |
| // Test out a topology with 3 level of sub-topology as: |
| // 0 |
| // / \ |
| // 1 3 |
| // \ / |
| // 2 |
| // where each pair of the sub topology is connected by repartition topic. |
| // The purpose of this test is to verify the robustness of the stream partition assignor algorithm, |
| // especially whether it could build the repartition topic counts (step zero) with a complex topology. |
| // The traversal path 0 -> 1 -> 2 -> 3 hits the case where sub-topology 2 will be initialized while its |
| // parent 3 hasn't been initialized yet. |
| builder.addSource(null, "KSTREAM-SOURCE-0000000000", null, null, null, "input-stream"); |
| builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new MockApiProcessorSupplier<>(), "KSTREAM-SOURCE-0000000000"); |
| builder.addProcessor("KSTREAM-BRANCH-0000000002", new MockApiProcessorSupplier<>(), "KSTREAM-FLATMAPVALUES-0000000001"); |
| builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new MockApiProcessorSupplier<>(), "KSTREAM-BRANCH-0000000002"); |
| builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new MockApiProcessorSupplier<>(), "KSTREAM-BRANCH-0000000002"); |
| builder.addProcessor("KSTREAM-MAP-0000000005", new MockApiProcessorSupplier<>(), "KSTREAM-BRANCHCHILD-0000000003"); |
| |
| builder.addInternalTopic("odd_store-repartition", InternalTopicProperties.empty()); |
| builder.addProcessor("odd_store-repartition-filter", new MockApiProcessorSupplier<>(), "KSTREAM-MAP-0000000005"); |
| builder.addSink("odd_store-repartition-sink", "odd_store-repartition", null, null, null, "odd_store-repartition-filter"); |
| builder.addSource(null, "odd_store-repartition-source", null, null, null, "odd_store-repartition"); |
| builder.addProcessor("KSTREAM-REDUCE-0000000006", new MockApiProcessorSupplier<>(), "odd_store-repartition-source"); |
| builder.addProcessor("KTABLE-TOSTREAM-0000000010", new MockApiProcessorSupplier<>(), "KSTREAM-REDUCE-0000000006"); |
| builder.addProcessor("KSTREAM-PEEK-0000000011", new MockApiProcessorSupplier<>(), "KTABLE-TOSTREAM-0000000010"); |
| builder.addProcessor("KSTREAM-MAP-0000000012", new MockApiProcessorSupplier<>(), "KSTREAM-PEEK-0000000011"); |
| |
| builder.addInternalTopic("odd_store_2-repartition", InternalTopicProperties.empty()); |
| builder.addProcessor("odd_store_2-repartition-filter", new MockApiProcessorSupplier<>(), "KSTREAM-MAP-0000000012"); |
| builder.addSink("odd_store_2-repartition-sink", "odd_store_2-repartition", null, null, null, "odd_store_2-repartition-filter"); |
| builder.addSource(null, "odd_store_2-repartition-source", null, null, null, "odd_store_2-repartition"); |
| builder.addProcessor("KSTREAM-REDUCE-0000000013", new MockApiProcessorSupplier<>(), "odd_store_2-repartition-source"); |
| builder.addProcessor("KSTREAM-MAP-0000000017", new MockApiProcessorSupplier<>(), "KSTREAM-BRANCHCHILD-0000000004"); |
| |
| builder.addInternalTopic("even_store-repartition", InternalTopicProperties.empty()); |
| builder.addProcessor("even_store-repartition-filter", new MockApiProcessorSupplier<>(), "KSTREAM-MAP-0000000017"); |
| builder.addSink("even_store-repartition-sink", "even_store-repartition", null, null, null, "even_store-repartition-filter"); |
| builder.addSource(null, "even_store-repartition-source", null, null, null, "even_store-repartition"); |
| builder.addProcessor("KSTREAM-REDUCE-0000000018", new MockApiProcessorSupplier<>(), "even_store-repartition-source"); |
| builder.addProcessor("KTABLE-TOSTREAM-0000000022", new MockApiProcessorSupplier<>(), "KSTREAM-REDUCE-0000000018"); |
| builder.addProcessor("KSTREAM-PEEK-0000000023", new MockApiProcessorSupplier<>(), "KTABLE-TOSTREAM-0000000022"); |
| builder.addProcessor("KSTREAM-MAP-0000000024", new MockApiProcessorSupplier<>(), "KSTREAM-PEEK-0000000023"); |
| |
| builder.addInternalTopic("even_store_2-repartition", InternalTopicProperties.empty()); |
| builder.addProcessor("even_store_2-repartition-filter", new MockApiProcessorSupplier<>(), "KSTREAM-MAP-0000000024"); |
| builder.addSink("even_store_2-repartition-sink", "even_store_2-repartition", null, null, null, "even_store_2-repartition-filter"); |
| builder.addSource(null, "even_store_2-repartition-source", null, null, null, "even_store_2-repartition"); |
| builder.addProcessor("KSTREAM-REDUCE-0000000025", new MockApiProcessorSupplier<>(), "even_store_2-repartition-source"); |
| builder.addProcessor("KTABLE-JOINTHIS-0000000030", new MockApiProcessorSupplier<>(), "KSTREAM-REDUCE-0000000013"); |
| builder.addProcessor("KTABLE-JOINOTHER-0000000031", new MockApiProcessorSupplier<>(), "KSTREAM-REDUCE-0000000025"); |
| builder.addProcessor("KTABLE-MERGE-0000000029", new MockApiProcessorSupplier<>(), "KTABLE-JOINTHIS-0000000030", "KTABLE-JOINOTHER-0000000031"); |
| builder.addProcessor("KTABLE-TOSTREAM-0000000032", new MockApiProcessorSupplier<>(), "KTABLE-MERGE-0000000029"); |
| |
| final List<String> topics = asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition"); |
| |
| configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| topics, |
| defaultSubscriptionInfo.encode()) |
| ); |
| |
| final Cluster metadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| Collections.singletonList(new PartitionInfo("input-stream", 0, Node.noNode(), new Node[0], new Node[0])), |
| emptySet(), |
| emptySet()); |
| |
| // This shall fail if we have bugs in the repartition topic creation due to the inconsistent order of sub-topologies. |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| } |
| |
| @Test |
| public void shouldGetAssignmentConfigs() { |
| createDefaultMockTaskManager(); |
| |
| final Map<String, Object> props = configProps(); |
| props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 11); |
| props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 33); |
| props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 44); |
| props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 55 * 60 * 1000L); |
| |
| partitionAssignor.configure(props); |
| |
| assertThat(partitionAssignor.acceptableRecoveryLag(), equalTo(11L)); |
| assertThat(partitionAssignor.maxWarmupReplicas(), equalTo(33)); |
| assertThat(partitionAssignor.numStandbyReplicas(), equalTo(44)); |
| assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L)); |
| } |
| |
| @Test |
| public void shouldGetTime() { |
| time.setCurrentTimeMs(Long.MAX_VALUE); |
| |
| createDefaultMockTaskManager(); |
| final Map<String, Object> props = configProps(); |
| final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props); |
| |
| assertThat(assignorConfiguration.referenceContainer().time.milliseconds(), equalTo(Long.MAX_VALUE)); |
| } |
| |
| @Test |
| public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogEndOffsets() { |
| final int changelogNumPartitions = 3; |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); |
| |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(changelogNumPartitions - 1)) |
| ); |
| |
| configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| )); |
| assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); |
| } |
| |
| @Test |
| public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOffsets() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor1"); |
| |
| adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( |
| singletonList(APPLICATION_ID + "-store1-changelog"), |
| singletonList(3)) |
| ); |
| |
| configureDefault(); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| )); |
| assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); |
| } |
| |
| @Test |
| public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() { |
| adminClient = mock(AdminClient.class); |
| final ListOffsetsResult result = mock(ListOffsetsResult.class); |
| final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = new KafkaFutureImpl<>(); |
| allFuture.complete(emptyMap()); |
| |
| when(adminClient.listOffsets(emptyMap())).thenReturn(result); |
| |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| )); |
| |
| configureDefault(); |
| overwriteInternalTopicManagerWithMock(true); |
| |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| } |
| |
| @Test |
| public void shouldRequestEndOffsetsForPreexistingChangelogs() { |
| final Set<TopicPartition> changelogs = mkSet( |
| new TopicPartition(APPLICATION_ID + "-store-changelog", 0), |
| new TopicPartition(APPLICATION_ID + "-store-changelog", 1), |
| new TopicPartition(APPLICATION_ID + "-store-changelog", 2) |
| ); |
| adminClient = mock(AdminClient.class); |
| final ListOffsetsResult result = mock(ListOffsetsResult.class); |
| for (final TopicPartition entry : changelogs) { |
| final KafkaFutureImpl<ListOffsetsResultInfo> partitionFuture = new KafkaFutureImpl<>(); |
| final ListOffsetsResultInfo info = mock(ListOffsetsResultInfo.class); |
| when(info.offset()).thenReturn(Long.MAX_VALUE); |
| partitionFuture.complete(info); |
| when(result.partitionResult(entry)).thenReturn(partitionFuture); |
| } |
| |
| @SuppressWarnings("unchecked") |
| final ArgumentCaptor<Map<TopicPartition, OffsetSpec>> capturedChangelogs = ArgumentCaptor.forClass(Map.class); |
| |
| when(adminClient.listOffsets(capturedChangelogs.capture())).thenReturn(result); |
| |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockApiProcessorSupplier<>(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor1"); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| )); |
| |
| configureDefault(); |
| overwriteInternalTopicManagerWithMock(false); |
| |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| |
| assertThat( |
| capturedChangelogs.getValue().keySet(), |
| equalTo(changelogs) |
| ); |
| } |
| |
| @Test |
| public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() { |
| final Set<TopicPartition> changelogs = mkSet( |
| new TopicPartition("topic1", 0), |
| new TopicPartition("topic1", 1), |
| new TopicPartition("topic1", 2) |
| ); |
| |
| final StreamsBuilder streamsBuilder = new StreamsBuilder(); |
| streamsBuilder.table("topic1", Materialized.as("store")); |
| |
| final Properties props = new Properties(); |
| props.putAll(configProps()); |
| props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); |
| builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(props)); |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(props)); |
| |
| subscriptions.put("consumer10", |
| new Subscription( |
| singletonList("topic1"), |
| defaultSubscriptionInfo.encode() |
| )); |
| |
| createDefaultMockTaskManager(); |
| configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE)); |
| overwriteInternalTopicManagerWithMock(false); |
| |
| final Consumer<byte[], byte[]> consumerClient = referenceContainer.mainConsumer; |
| when(consumerClient.committed(changelogs)) |
| .thenReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE)))); |
| |
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); |
| } |
| |
| @Test |
| public void shouldEncodeMissingSourceTopicError() { |
| final Cluster emptyClusterMetadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| emptyList(), |
| emptySet(), |
| emptySet() |
| ); |
| |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| configureDefault(); |
| |
| subscriptions.put("consumer", |
| new Subscription( |
| singletonList("topic"), |
| defaultSubscriptionInfo.encode() |
| )); |
| final Map<String, Assignment> assignments = partitionAssignor.assign(emptyClusterMetadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(), |
| equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())); |
| } |
| |
| @Test |
| public void testUniqueField() { |
| createDefaultMockTaskManager(); |
| configureDefaultPartitionAssignor(); |
| final Set<String> topics = mkSet("input"); |
| |
| assertEquals(0, partitionAssignor.uniqueField()); |
| partitionAssignor.subscriptionUserData(topics); |
| assertEquals(1, partitionAssignor.uniqueField()); |
| partitionAssignor.subscriptionUserData(topics); |
| assertEquals(2, partitionAssignor.uniqueField()); |
| } |
| |
| @Test |
| public void testUniqueFieldOverflow() { |
| createDefaultMockTaskManager(); |
| configureDefaultPartitionAssignor(); |
| final Set<String> topics = mkSet("input"); |
| |
| for (int i = 0; i < 127; i++) { |
| partitionAssignor.subscriptionUserData(topics); |
| } |
| assertEquals(127, partitionAssignor.uniqueField()); |
| partitionAssignor.subscriptionUserData(topics); |
| assertEquals(-128, partitionAssignor.uniqueField()); |
| } |
| |
| @Test |
| public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() { |
| builder = new CorruptedInternalTopologyBuilder(); |
| topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); |
| |
| final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder); |
| |
| final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>()); |
| final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(), new MaterializedInternal<>(Materialized.as("store"))); |
| inputTopic |
| .groupBy( |
| (k, v) -> k, |
| Grouped.with("GroupName", Serdes.String(), Serdes.String()) |
| ) |
| .windowedBy(TimeWindows.of(Duration.ofMinutes(10))) |
| .aggregate( |
| () -> "", |
| (k, v, a) -> a + k) |
| .leftJoin( |
| inputTable, |
| v -> v, |
| (x, y) -> x + y |
| ); |
| streamsBuilder.buildAndOptimizeTopology(); |
| |
| configureDefault(); |
| |
| subscriptions.put("consumer", |
| new Subscription( |
| singletonList("topic"), |
| defaultSubscriptionInfo.encode() |
| )); |
| final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(), |
| equalTo(AssignorError.ASSIGNMENT_ERROR.code())); |
| } |
| |
| @Test |
| public void testClientTags() { |
| clientTags = mkMap(mkEntry("cluster", "cluster1"), mkEntry("zone", "az1")); |
| createDefaultMockTaskManager(); |
| configureDefaultPartitionAssignor(); |
| final Set<String> topics = mkSet("input"); |
| final Subscription subscription = new Subscription(new ArrayList<>(topics), |
| partitionAssignor.subscriptionUserData(topics)); |
| final SubscriptionInfo info = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, uniqueField, clientTags); |
| |
| assertEquals(singletonList("input"), subscription.topics()); |
| assertEquals(info, SubscriptionInfo.decode(subscription.userData())); |
| assertEquals(clientTags, partitionAssignor.clientTags()); |
| } |
| |
| private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder { |
| private Map<Subtopology, TopicsInfo> corruptedTopicGroups; |
| |
| @Override |
| public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() { |
| if (corruptedTopicGroups == null) { |
| corruptedTopicGroups = new HashMap<>(); |
| for (final Map.Entry<Subtopology, TopicsInfo> topicGroupEntry : super.subtopologyToTopicsInfo().entrySet()) { |
| final TopicsInfo originalInfo = topicGroupEntry.getValue(); |
| corruptedTopicGroups.put( |
| topicGroupEntry.getKey(), |
| new TopicsInfo( |
| emptySet(), |
| originalInfo.sourceTopics, |
| originalInfo.repartitionSourceTopics, |
| originalInfo.stateChangelogTopics |
| )); |
| } |
| } |
| |
| return corruptedTopicGroups; |
| } |
| } |
| |
| private static ByteBuffer encodeFutureSubscription() { |
| final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); |
| buf.putInt(LATEST_SUPPORTED_VERSION + 1); |
| buf.putInt(LATEST_SUPPORTED_VERSION + 1); |
| return buf; |
| } |
| |
| private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { |
| subscriptions.put("consumer1", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| getInfoForOlderVersion(oldVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode()) |
| ); |
| subscriptions.put("future-consumer", |
| new Subscription( |
| Collections.singletonList("topic1"), |
| encodeFutureSubscription()) |
| ); |
| configureDefault(); |
| |
| final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
| |
| assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code())); |
| assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code())); |
| } |
| |
| private static Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) { |
| final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, emptyList(), emptyMap(), firstHostState, emptyMap(), 0); |
| return new Assignment(emptyList(), info.encode()); |
| } |
| |
| private static AssignmentInfo checkAssignment(final Set<String> expectedTopics, |
| final Assignment assignment) { |
| |
| // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group. |
| |
| final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); |
| |
| // check if the number of assigned partitions == the size of active task id list |
| assertEquals(assignment.partitions().size(), info.activeTasks().size()); |
| |
| // check if active tasks are consistent |
| final List<TaskId> activeTasks = new ArrayList<>(); |
| final Set<String> activeTopics = new HashSet<>(); |
| for (final TopicPartition partition : assignment.partitions()) { |
| // since default grouper, taskid.partition == partition.partition() |
| activeTasks.add(new TaskId(0, partition.partition())); |
| activeTopics.add(partition.topic()); |
| } |
| assertEquals(activeTasks, info.activeTasks()); |
| |
| // check if active partitions cover all topics |
| assertEquals(expectedTopics, activeTopics); |
| |
| // check if standby tasks are consistent |
| final Set<String> standbyTopics = new HashSet<>(); |
| for (final Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) { |
| final TaskId id = entry.getKey(); |
| final Set<TopicPartition> partitions = entry.getValue(); |
| for (final TopicPartition partition : partitions) { |
| // since default grouper, taskid.partition == partition.partition() |
| assertEquals(id.partition(), partition.partition()); |
| |
| standbyTopics.add(partition.topic()); |
| } |
| } |
| |
| if (!info.standbyTasks().isEmpty()) { |
| // check if standby partitions cover all topics |
| assertEquals(expectedTopics, standbyTopics); |
| } |
| |
| return info; |
| } |
| |
| private static void assertEquivalentAssignment(final Map<String, List<TaskId>> thisAssignment, |
| final Map<String, List<TaskId>> otherAssignment) { |
| assertEquals(thisAssignment.size(), otherAssignment.size()); |
| for (final Map.Entry<String, List<TaskId>> entry : thisAssignment.entrySet()) { |
| final String consumer = entry.getKey(); |
| assertTrue(otherAssignment.containsKey(consumer)); |
| |
| final List<TaskId> thisTaskList = entry.getValue(); |
| Collections.sort(thisTaskList); |
| final List<TaskId> otherTaskList = otherAssignment.get(consumer); |
| Collections.sort(otherTaskList); |
| |
| assertThat(thisTaskList, equalTo(otherTaskList)); |
| } |
| } |
| |
| /** |
| * Helper for building the input to createMockAdminClient in cases where we don't care about the actual offsets |
| * @param changelogTopics The names of all changelog topics in the topology |
| * @param topicsNumPartitions The number of partitions for the corresponding changelog topic, such that the number |
| * of partitions of the ith topic in changelogTopics is given by the ith element of topicsNumPartitions |
| */ |
| private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(final List<String> changelogTopics, |
| final List<Integer> topicsNumPartitions) { |
| if (changelogTopics.size() != topicsNumPartitions.size()) { |
| throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + |
| topicsNumPartitions.size() + " different numPartitions for the topics"); |
| } |
| final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>(); |
| for (int i = 0; i < changelogTopics.size(); ++i) { |
| final String topic = changelogTopics.get(i); |
| final int numPartitions = topicsNumPartitions.get(i); |
| for (int partition = 0; partition < numPartitions; ++partition) { |
| changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE); |
| } |
| } |
| return changelogEndOffsets; |
| } |
| |
| private static SubscriptionInfo getInfoForOlderVersion(final int version, |
| final UUID processId, |
| final Set<TaskId> prevTasks, |
| final Set<TaskId> standbyTasks) { |
| return new SubscriptionInfo( |
| version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS); |
| } |
| |
| // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets |
| private static Map<TaskId, Long> getTaskOffsetSums(final Collection<TaskId> activeTasks, final Collection<TaskId> standbyTasks) { |
| final Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> Task.LATEST_OFFSET)); |
| taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L))); |
| return taskOffsetSums; |
| } |
| |
| // Stub end offsets sums for situations where we don't really care about computing exact lags |
| private static Map<TaskId, Long> getTaskEndOffsetSums(final Collection<TaskId> allStatefulTasks) { |
| return allStatefulTasks.stream().collect(Collectors.toMap(t -> t, t -> Long.MAX_VALUE)); |
| } |
| |
| } |