| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.processor.internals; |
| |
| import org.apache.kafka.clients.consumer.internals.PartitionAssignor; |
| import org.apache.kafka.common.Cluster; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.Node; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.config.ConfigException; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsBuilder; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.TopologyWrapper; |
| import org.apache.kafka.streams.kstream.JoinWindows; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.Materialized; |
| import org.apache.kafka.streams.kstream.ValueJoiner; |
| import org.apache.kafka.streams.processor.TaskId; |
| import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; |
| import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; |
| import org.apache.kafka.streams.state.HostInfo; |
| import org.apache.kafka.test.MockClientSupplier; |
| import org.apache.kafka.test.MockInternalTopicManager; |
| import org.apache.kafka.test.MockKeyValueStoreBuilder; |
| import org.apache.kafka.test.MockProcessorSupplier; |
| import org.easymock.Capture; |
| import org.easymock.EasyMock; |
| import org.junit.Test; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static java.time.Duration.ofMillis; |
| import static java.util.Arrays.asList; |
| import static org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo.LATEST_SUPPORTED_VERSION; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.fail; |
| |
| @SuppressWarnings("unchecked") |
| public class StreamsPartitionAssignorTest { |
| |
| private final TopicPartition t1p0 = new TopicPartition("topic1", 0); |
| private final TopicPartition t1p1 = new TopicPartition("topic1", 1); |
| private final TopicPartition t1p2 = new TopicPartition("topic1", 2); |
| private final TopicPartition t1p3 = new TopicPartition("topic1", 3); |
| private final TopicPartition t2p0 = new TopicPartition("topic2", 0); |
| private final TopicPartition t2p1 = new TopicPartition("topic2", 1); |
| private final TopicPartition t2p2 = new TopicPartition("topic2", 2); |
| private final TopicPartition t2p3 = new TopicPartition("topic2", 3); |
| private final TopicPartition t3p0 = new TopicPartition("topic3", 0); |
| private final TopicPartition t3p1 = new TopicPartition("topic3", 1); |
| private final TopicPartition t3p2 = new TopicPartition("topic3", 2); |
| private final TopicPartition t3p3 = new TopicPartition("topic3", 3); |
| |
| private final Set<String> allTopics = Utils.mkSet("topic1", "topic2"); |
| |
| private final List<PartitionInfo> infos = asList( |
| new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) |
| ); |
| |
| private final Cluster metadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| infos, |
| Collections.emptySet(), |
| Collections.emptySet()); |
| |
| private final TaskId task0 = new TaskId(0, 0); |
| private final TaskId task1 = new TaskId(0, 1); |
| private final TaskId task2 = new TaskId(0, 2); |
| private final TaskId task3 = new TaskId(0, 3); |
| private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); |
| private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); |
| private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); |
| private final StreamsConfig streamsConfig = new StreamsConfig(configProps()); |
| private final String userEndPoint = "localhost:8080"; |
| private final String applicationId = "stream-partition-assignor-test"; |
| |
| private final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); |
| |
| private Map<String, Object> configProps() { |
| final Map<String, Object> configurationMap = new HashMap<>(); |
| configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); |
| configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); |
| configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); |
| configurationMap.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger()); |
| return configurationMap; |
| } |
| |
| private void configurePartitionAssignor(final Map<String, Object> props) { |
| final Map<String, Object> configurationMap = configProps(); |
| configurationMap.putAll(props); |
| partitionAssignor.configure(configurationMap); |
| } |
| |
| private void mockTaskManager(final Set<TaskId> prevTasks, |
| final Set<TaskId> cachedTasks, |
| final UUID processId, |
| final InternalTopologyBuilder builder) { |
| EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes(); |
| EasyMock.expect(taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes(); |
| EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes(); |
| EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes(); |
| EasyMock.replay(taskManager); |
| } |
| |
| @Test |
| public void shouldInterleaveTasksByGroupId() { |
| final TaskId taskIdA0 = new TaskId(0, 0); |
| final TaskId taskIdA1 = new TaskId(0, 1); |
| final TaskId taskIdA2 = new TaskId(0, 2); |
| final TaskId taskIdA3 = new TaskId(0, 3); |
| |
| final TaskId taskIdB0 = new TaskId(1, 0); |
| final TaskId taskIdB1 = new TaskId(1, 1); |
| final TaskId taskIdB2 = new TaskId(1, 2); |
| |
| final TaskId taskIdC0 = new TaskId(2, 0); |
| final TaskId taskIdC1 = new TaskId(2, 1); |
| |
| final List<TaskId> expectedSubList1 = asList(taskIdA0, taskIdA3, taskIdB2); |
| final List<TaskId> expectedSubList2 = asList(taskIdA1, taskIdB0, taskIdC0); |
| final List<TaskId> expectedSubList3 = asList(taskIdA2, taskIdB1, taskIdC1); |
| final List<List<TaskId>> embeddedList = asList(expectedSubList1, expectedSubList2, expectedSubList3); |
| |
| final List<TaskId> tasks = asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3); |
| Collections.shuffle(tasks); |
| |
| final List<List<TaskId>> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks, 3); |
| |
| assertThat(interleavedTaskIds, equalTo(embeddedList)); |
| } |
| |
| @Test |
| public void testSubscription() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); |
| |
| final Set<TaskId> prevTasks = Utils.mkSet( |
| new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); |
| final Set<TaskId> cachedTasks = Utils.mkSet( |
| new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), |
| new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); |
| |
| final UUID processId = UUID.randomUUID(); |
| mockTaskManager(prevTasks, cachedTasks, processId, builder); |
| |
| configurePartitionAssignor(Collections.emptyMap()); |
| final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); |
| |
| Collections.sort(subscription.topics()); |
| assertEquals(asList("topic1", "topic2"), subscription.topics()); |
| |
| final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); |
| standbyTasks.removeAll(prevTasks); |
| |
| final SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null); |
| assertEquals(info.encode(), subscription.userData()); |
| } |
| |
| @Test |
| public void testAssignBasic() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final Set<TaskId> prevTasks10 = Utils.mkSet(task0); |
| final Set<TaskId> prevTasks11 = Utils.mkSet(task1); |
| final Set<TaskId> prevTasks20 = Utils.mkSet(task2); |
| final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); |
| final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); |
| final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| final UUID uuid2 = UUID.randomUUID(); |
| |
| mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); |
| subscriptions.put("consumer11", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode())); |
| subscriptions.put("consumer20", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); |
| |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| |
| // check assigned partitions |
| assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), |
| Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); |
| assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); |
| |
| // check assignment info |
| |
| // the first consumer |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); |
| allActiveTasks.addAll(info11.activeTasks()); |
| |
| assertEquals(Utils.mkSet(task0, task1), allActiveTasks); |
| |
| // the third consumer |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| allActiveTasks.addAll(info20.activeTasks()); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, new HashSet<>(allActiveTasks)); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| } |
| |
| @Test |
| public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1"); |
| builder.addProcessor("processorII", new MockProcessorSupplier(), "source2"); |
| |
| final List<PartitionInfo> localInfos = asList( |
| new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]) |
| ); |
| |
| final Cluster localMetadata = new Cluster( |
| "cluster", |
| Collections.singletonList(Node.noNode()), |
| localInfos, |
| Collections.emptySet(), |
| Collections.emptySet()); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| final TaskId taskIdA0 = new TaskId(0, 0); |
| final TaskId taskIdA1 = new TaskId(0, 1); |
| final TaskId taskIdA2 = new TaskId(0, 2); |
| final TaskId taskIdA3 = new TaskId(0, 3); |
| |
| final TaskId taskIdB0 = new TaskId(1, 0); |
| final TaskId taskIdB1 = new TaskId(1, 1); |
| final TaskId taskIdB2 = new TaskId(1, 2); |
| final TaskId taskIdB3 = new TaskId(1, 3); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| |
| mockTaskManager(new HashSet<>(), new HashSet<>(), uuid1, builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); |
| subscriptions.put("consumer11", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions); |
| |
| // check assigned partitions |
| assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)), |
| Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); |
| |
| // the first consumer |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| |
| final List<TaskId> expectedInfo10TaskIds = asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3); |
| assertEquals(expectedInfo10TaskIds, info10.activeTasks()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final List<TaskId> expectedInfo11TaskIds = asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2); |
| |
| assertEquals(expectedInfo11TaskIds, info11.activeTasks()); |
| } |
| |
| @Test |
| public void testAssignWithPartialTopology() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor2"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| |
| mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class)); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); |
| |
| |
| // will throw exception if it fails |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| |
| // check assignment info |
| final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, new HashSet<>(allActiveTasks)); |
| } |
| |
| |
| @Test |
| public void testAssignEmptyMetadata() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final Set<TaskId> prevTasks10 = Utils.mkSet(task0); |
| final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); |
| final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), |
| Collections.emptySet(), |
| Collections.emptySet(), |
| Collections.emptySet()); |
| final UUID uuid1 = UUID.randomUUID(); |
| |
| mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); |
| |
| // initially metadata is empty |
| Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions); |
| |
| // check assigned partitions |
| assertEquals(Collections.emptySet(), |
| new HashSet<>(assignments.get("consumer10").partitions())); |
| |
| // check assignment info |
| AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| |
| assertEquals(0, allActiveTasks.size()); |
| assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks)); |
| |
| // then metadata gets populated |
| assignments = partitionAssignor.assign(metadata, subscriptions); |
| // check assigned partitions |
| assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)), |
| Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()))); |
| |
| // the first consumer |
| info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| allActiveTasks.addAll(info10.activeTasks()); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, new HashSet<>(allActiveTasks)); |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| } |
| |
| @Test |
| public void testAssignWithNewTasks() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addSource(null, "source3", null, null, null, "topic3"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); |
| final List<String> topics = asList("topic1", "topic2", "topic3"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); |
| |
| // assuming that previous tasks do not have topic3 |
| final Set<TaskId> prevTasks10 = Utils.mkSet(task0); |
| final Set<TaskId> prevTasks11 = Utils.mkSet(task1); |
| final Set<TaskId> prevTasks20 = Utils.mkSet(task2); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| final UUID uuid2 = UUID.randomUUID(); |
| mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), userEndPoint).encode())); |
| subscriptions.put("consumer11", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), userEndPoint).encode())); |
| subscriptions.put("consumer20", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), userEndPoint).encode())); |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| |
| // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match |
| // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and |
| // then later ones will be re-assigned to other hosts due to load balancing |
| AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks()); |
| final Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions()); |
| |
| info = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| allActiveTasks.addAll(info.activeTasks()); |
| allPartitions.addAll(assignments.get("consumer11").partitions()); |
| |
| info = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| allActiveTasks.addAll(info.activeTasks()); |
| allPartitions.addAll(assignments.get("consumer20").partitions()); |
| |
| assertEquals(allTasks, allActiveTasks); |
| assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); |
| } |
| |
| @Test |
| public void testAssignWithStates() { |
| builder.setApplicationId(applicationId); |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| |
| builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor-1"); |
| |
| builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor-2"); |
| builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), "processor-2"); |
| |
| final List<String> topics = asList("topic1", "topic2"); |
| |
| final TaskId task00 = new TaskId(0, 0); |
| final TaskId task01 = new TaskId(0, 1); |
| final TaskId task02 = new TaskId(0, 2); |
| final TaskId task10 = new TaskId(1, 0); |
| final TaskId task11 = new TaskId(1, 1); |
| final TaskId task12 = new TaskId(1, 2); |
| final List<TaskId> tasks = asList(task00, task01, task02, task10, task11, task12); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| final UUID uuid2 = UUID.randomUUID(); |
| |
| mockTaskManager( |
| Collections.emptySet(), |
| Collections.emptySet(), |
| uuid1, |
| builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); |
| subscriptions.put("consumer11", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); |
| subscriptions.put("consumer20", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| |
| // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match |
| assertEquals(2, assignments.get("consumer10").partitions().size()); |
| assertEquals(2, assignments.get("consumer11").partitions().size()); |
| assertEquals(2, assignments.get("consumer20").partitions().size()); |
| |
| final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); |
| final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); |
| final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); |
| |
| assertEquals(2, info10.activeTasks().size()); |
| assertEquals(2, info11.activeTasks().size()); |
| assertEquals(2, info20.activeTasks().size()); |
| |
| final Set<TaskId> allTasks = new HashSet<>(); |
| allTasks.addAll(info10.activeTasks()); |
| allTasks.addAll(info11.activeTasks()); |
| allTasks.addAll(info20.activeTasks()); |
| assertEquals(new HashSet<>(tasks), allTasks); |
| |
| // check tasks for state topics |
| final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); |
| |
| assertEquals(Utils.mkSet(task00, task01, task02), tasksForState("store1", tasks, topicGroups)); |
| assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store2", tasks, topicGroups)); |
| assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store3", tasks, topicGroups)); |
| } |
| |
| private Set<TaskId> tasksForState(final String storeName, |
| final List<TaskId> tasks, |
| final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) { |
| final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); |
| |
| final Set<TaskId> ids = new HashSet<>(); |
| for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { |
| final Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet(); |
| |
| if (stateChangelogTopics.contains(changelogTopic)) { |
| for (final TaskId id : tasks) { |
| if (id.topicGroupId == entry.getKey()) { |
| ids.add(id); |
| } |
| } |
| } |
| } |
| return ids; |
| } |
| |
| @Test |
| public void testAssignWithStandbyReplicas() { |
| final Map<String, Object> props = configProps(); |
| props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); |
| final StreamsConfig streamsConfig = new StreamsConfig(props); |
| |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addSource(null, "source2", null, null, null, "topic2"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); |
| final List<String> topics = asList("topic1", "topic2"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| |
| final Set<TaskId> prevTasks00 = Utils.mkSet(task0); |
| final Set<TaskId> prevTasks01 = Utils.mkSet(task1); |
| final Set<TaskId> prevTasks02 = Utils.mkSet(task2); |
| final Set<TaskId> standbyTasks01 = Utils.mkSet(task1); |
| final Set<TaskId> standbyTasks02 = Utils.mkSet(task2); |
| final Set<TaskId> standbyTasks00 = Utils.mkSet(task0); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| final UUID uuid2 = UUID.randomUUID(); |
| |
| mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder); |
| |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode())); |
| subscriptions.put("consumer11", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode())); |
| subscriptions.put("consumer20", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode())); |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| |
| // the first consumer |
| final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); |
| final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks()); |
| final Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet()); |
| |
| // the second consumer |
| final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); |
| allActiveTasks.addAll(info11.activeTasks()); |
| allStandbyTasks.addAll(info11.standbyTasks().keySet()); |
| |
| assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet()); |
| |
| // check active tasks assigned to the first client |
| assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); |
| assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); |
| |
| // the third consumer |
| final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); |
| allActiveTasks.addAll(info20.activeTasks()); |
| allStandbyTasks.addAll(info20.standbyTasks().keySet()); |
| |
| // all task ids are in the active tasks and also in the standby tasks |
| |
| assertEquals(3, allActiveTasks.size()); |
| assertEquals(allTasks, allActiveTasks); |
| |
| assertEquals(3, allStandbyTasks.size()); |
| assertEquals(allTasks, allStandbyTasks); |
| } |
| |
| @Test |
| public void testOnAssignment() { |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| final List<TaskId> activeTaskList = asList(task0, task3); |
| final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); |
| final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); |
| final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap( |
| new HostInfo("localhost", 9090), |
| Utils.mkSet(t3p0, t3p3)); |
| activeTasks.put(task0, Utils.mkSet(t3p0)); |
| activeTasks.put(task3, Utils.mkSet(t3p3)); |
| standbyTasks.put(task1, Utils.mkSet(t3p1)); |
| standbyTasks.put(task2, Utils.mkSet(t3p2)); |
| |
| final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState); |
| final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); |
| |
| final Capture<Cluster> capturedCluster = EasyMock.newCapture(); |
| taskManager.setPartitionsByHostState(hostState); |
| EasyMock.expectLastCall(); |
| taskManager.setAssignmentMetadata(activeTasks, standbyTasks); |
| EasyMock.expectLastCall(); |
| taskManager.setClusterMetadata(EasyMock.capture(capturedCluster)); |
| EasyMock.expectLastCall(); |
| EasyMock.replay(taskManager); |
| |
| partitionAssignor.onAssignment(assignment); |
| |
| EasyMock.verify(taskManager); |
| |
| assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics()); |
| assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size()); |
| } |
| |
| @Test |
| public void testAssignWithInternalTopics() { |
| builder.setApplicationId(applicationId); |
| builder.addInternalTopic("topicX"); |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); |
| builder.addSink("sink1", "topicX", null, null, null, "processor1"); |
| builder.addSource(null, "source2", null, null, null, "topicX"); |
| builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); |
| final List<String> topics = asList("topic1", applicationId + "-topicX"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); |
| partitionAssignor.setInternalTopicManager(internalTopicManager); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); |
| |
| partitionAssignor.assign(metadata, subscriptions); |
| |
| // check prepared internal topics |
| assertEquals(1, internalTopicManager.readyTopics.size()); |
| assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get(applicationId + "-topicX")); |
| } |
| |
| @Test |
| public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { |
| final String applicationId = "test"; |
| builder.setApplicationId(applicationId); |
| builder.addInternalTopic("topicX"); |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); |
| builder.addSink("sink1", "topicX", null, null, null, "processor1"); |
| builder.addSource(null, "source2", null, null, null, "topicX"); |
| builder.addInternalTopic("topicZ"); |
| builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); |
| builder.addSink("sink2", "topicZ", null, null, null, "processor2"); |
| builder.addSource(null, "source3", null, null, null, "topicZ"); |
| final List<String> topics = asList("topic1", "test-topicX", "test-topicZ"); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); |
| |
| configurePartitionAssignor(Collections.emptyMap()); |
| final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); |
| partitionAssignor.setInternalTopicManager(internalTopicManager); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put("consumer10", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); |
| |
| partitionAssignor.assign(metadata, subscriptions); |
| |
| // check prepared internal topics |
| assertEquals(2, internalTopicManager.readyTopics.size()); |
| assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ")); |
| } |
| |
| @Test |
| public void shouldGenerateTasksForAllCreatedPartitions() { |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| // KStream with 3 partitions |
| final KStream<Object, Object> stream1 = builder |
| .stream("topic1") |
| // force creation of internal repartition topic |
| .map((KeyValueMapper<Object, Object, KeyValue<Object, Object>>) KeyValue::new); |
| |
| // KTable with 4 partitions |
| final KTable<Object, Long> table1 = builder |
| .table("topic3") |
| // force creation of internal repartition topic |
| .groupBy(KeyValue::new) |
| .count(); |
| |
| // joining the stream and the table |
| // this triggers the enforceCopartitioning() routine in the StreamsPartitionAssignor, |
| // forcing the stream.map to get repartitioned to a topic with four partitions. |
| stream1.join( |
| table1, |
| (ValueJoiner) (value1, value2) -> null); |
| |
| final UUID uuid = UUID.randomUUID(); |
| final String client = "client1"; |
| final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); |
| internalTopologyBuilder.setApplicationId(applicationId); |
| |
| mockTaskManager( |
| Collections.emptySet(), |
| Collections.emptySet(), |
| UUID.randomUUID(), |
| internalTopologyBuilder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( |
| streamsConfig, |
| mockClientSupplier.restoreConsumer); |
| partitionAssignor.setInternalTopicManager(mockInternalTopicManager); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put( |
| client, |
| new PartitionAssignor.Subscription( |
| asList("topic1", "topic3"), |
| new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode() |
| ) |
| ); |
| |
| final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); |
| |
| final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>(); |
| expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); |
| expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4); |
| expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog", 4); |
| expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition", 4); |
| |
| // check if all internal topics were created as expected |
| assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); |
| |
| final List<TopicPartition> expectedAssignment = asList( |
| new TopicPartition("topic1", 0), |
| new TopicPartition("topic1", 1), |
| new TopicPartition("topic1", 2), |
| new TopicPartition("topic3", 0), |
| new TopicPartition("topic3", 1), |
| new TopicPartition("topic3", 2), |
| new TopicPartition("topic3", 3), |
| new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), |
| new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), |
| new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), |
| new TopicPartition(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), |
| new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 0), |
| new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 1), |
| new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 2), |
| new TopicPartition(applicationId + "-KSTREAM-MAP-0000000001-repartition", 3) |
| ); |
| |
| // check if we created a task for all expected topicPartitions. |
| assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); |
| } |
| |
| @Test |
| public void shouldAddUserDefinedEndPointToSubscription() { |
| builder.setApplicationId(applicationId); |
| builder.addSource(null, "source", null, null, null, "input"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source"); |
| builder.addSink("sink", "output", null, null, null, "processor"); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| mockTaskManager( |
| Collections.emptySet(), |
| Collections.emptySet(), |
| uuid1, |
| builder); |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); |
| final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); |
| final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); |
| assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); |
| } |
| |
| @Test |
| public void shouldMapUserEndPointToTopicPartitions() { |
| builder.setApplicationId(applicationId); |
| builder.addSource(null, "source", null, null, null, "topic1"); |
| builder.addProcessor("processor", new MockProcessorSupplier(), "source"); |
| builder.addSink("sink", "output", null, null, null, "processor"); |
| |
| final List<String> topics = Collections.singletonList("topic1"); |
| |
| final UUID uuid1 = UUID.randomUUID(); |
| |
| mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); |
| |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put("consumer1", |
| new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); |
| |
| final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); |
| final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); |
| final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); |
| final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); |
| assertEquals( |
| Utils.mkSet( |
| new TopicPartition("topic1", 0), |
| new TopicPartition("topic1", 1), |
| new TopicPartition("topic1", 2)), |
| topicPartitions); |
| } |
| |
| @Test |
| public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { |
| builder.setApplicationId(applicationId); |
| |
| mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), builder); |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); |
| |
| try { |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost")); |
| fail("expected to an exception due to invalid config"); |
| } catch (final ConfigException e) { |
| // pass |
| } |
| } |
| |
| @Test |
| public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { |
| builder.setApplicationId(applicationId); |
| |
| try { |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:j87yhk")); |
| fail("expected to an exception due to invalid config"); |
| } catch (final ConfigException e) { |
| // pass |
| } |
| } |
| |
| @Test |
| public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() { |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| final KStream<Object, Object> stream1 = builder |
| |
| // Task 1 (should get created): |
| .stream("topic1") |
| // force repartitioning for aggregation |
| .selectKey((key, value) -> null) |
| .groupByKey() |
| |
| // Task 2 (should get created): |
| // create repartioning and changelog topic as task 1 exists |
| .count(Materialized.as("count")) |
| |
| // force repartitioning for join, but second join input topic unknown |
| // -> internal repartitioning topic should not get created |
| .toStream() |
| .map((KeyValueMapper<Object, Long, KeyValue<Object, Object>>) (key, value) -> null); |
| |
| builder |
| // Task 3 (should not get created because input topic unknown) |
| .stream("unknownTopic") |
| |
| // force repartitioning for join, but input topic unknown |
| // -> thus should not create internal repartitioning topic |
| .selectKey((key, value) -> null) |
| |
| // Task 4 (should not get created because input topics unknown) |
| // should not create any of both input repartition topics or any of both changelog topics |
| .join( |
| stream1, |
| (ValueJoiner) (value1, value2) -> null, |
| JoinWindows.of(ofMillis(0)) |
| ); |
| |
| final UUID uuid = UUID.randomUUID(); |
| final String client = "client1"; |
| |
| final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); |
| internalTopologyBuilder.setApplicationId(applicationId); |
| |
| mockTaskManager( |
| Collections.emptySet(), |
| Collections.emptySet(), |
| UUID.randomUUID(), |
| internalTopologyBuilder); |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( |
| streamsConfig, |
| mockClientSupplier.restoreConsumer); |
| partitionAssignor.setInternalTopicManager(mockInternalTopicManager); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put( |
| client, |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("unknownTopic"), |
| new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode() |
| ) |
| ); |
| |
| final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); |
| |
| assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true)); |
| |
| assertThat(assignment.get(client).partitions().isEmpty(), equalTo(true)); |
| } |
| |
| @Test |
| public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() { |
| final TopicPartition partitionOne = new TopicPartition("topic", 1); |
| final TopicPartition partitionTwo = new TopicPartition("topic", 2); |
| final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap( |
| new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo)); |
| |
| configurePartitionAssignor(Collections.emptyMap()); |
| |
| taskManager.setPartitionsByHostState(hostState); |
| EasyMock.expectLastCall(); |
| EasyMock.replay(taskManager); |
| |
| partitionAssignor.onAssignment(createAssignment(hostState)); |
| |
| EasyMock.verify(taskManager); |
| } |
| |
| @Test |
| public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { |
| final StreamsBuilder builder = new StreamsBuilder(); |
| |
| builder.stream("topic1").groupByKey().count(); |
| final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); |
| internalTopologyBuilder.setApplicationId(applicationId); |
| |
| |
| final UUID uuid = UUID.randomUUID(); |
| mockTaskManager( |
| Collections.emptySet(), |
| Collections.emptySet(), |
| uuid, |
| internalTopologyBuilder); |
| |
| final Map<String, Object> props = new HashMap<>(); |
| props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); |
| props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint); |
| configurePartitionAssignor(props); |
| partitionAssignor.setInternalTopicManager(new MockInternalTopicManager( |
| streamsConfig, |
| mockClientSupplier.restoreConsumer)); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put( |
| "consumer1", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode() |
| ) |
| ); |
| |
| subscriptions.put( |
| "consumer2", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode() |
| ) |
| ); |
| final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2); |
| final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions); |
| final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); |
| final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); |
| final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); |
| final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); |
| final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions); |
| allAssignedPartitions.addAll(consumer2Partitions); |
| assertThat(consumer1partitions, not(allPartitions)); |
| assertThat(consumer2Partitions, not(allPartitions)); |
| assertThat(allAssignedPartitions, equalTo(allPartitions)); |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() { |
| final Map<String, Object> config = configProps(); |
| config.remove(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); |
| |
| try { |
| partitionAssignor.configure(config); |
| fail("Should have thrown KafkaException"); |
| } catch (final KafkaException expected) { |
| assertThat(expected.getMessage(), equalTo("TaskManager is not specified")); |
| } |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() { |
| final Map<String, Object> config = configProps(); |
| config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a task manager"); |
| |
| try { |
| partitionAssignor.configure(config); |
| fail("Should have thrown KafkaException"); |
| } catch (final KafkaException expected) { |
| assertThat(expected.getMessage(), |
| equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager")); |
| } |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() { |
| final Map<String, Object> config = configProps(); |
| config.remove(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE); |
| |
| try { |
| partitionAssignor.configure(config); |
| fail("Should have thrown KafkaException"); |
| } catch (final KafkaException expected) { |
| assertThat(expected.getMessage(), equalTo("assignmentErrorCode is not specified")); |
| } |
| } |
| |
| @Test |
| public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() { |
| final Map<String, Object> config = configProps(); |
| config.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, "i am not an AtomicInteger"); |
| |
| try { |
| partitionAssignor.configure(config); |
| fail("Should have thrown KafkaException"); |
| } catch (final KafkaException expected) { |
| assertThat(expected.getMessage(), |
| equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger")); |
| } |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2); |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3); |
| } |
| |
| @Test |
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() { |
| shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3); |
| } |
| |
| private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, |
| final int otherVersion) { |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put( |
| "consumer1", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() |
| ) |
| ); |
| subscriptions.put( |
| "consumer2", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() |
| ) |
| ); |
| |
| mockTaskManager( |
| emptyTasks, |
| emptyTasks, |
| UUID.randomUUID(), |
| builder); |
| partitionAssignor.configure(configProps()); |
| final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); |
| |
| assertThat(assignment.size(), equalTo(2)); |
| assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); |
| assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion)); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion1() { |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| |
| mockTaskManager( |
| emptyTasks, |
| emptyTasks, |
| UUID.randomUUID(), |
| builder); |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100)); |
| |
| final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); |
| |
| assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0101() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0102() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For0110() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For10() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10); |
| } |
| |
| @Test |
| public void shouldDownGradeSubscriptionToVersion2For11() { |
| shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11); |
| } |
| |
| private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) { |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| |
| mockTaskManager( |
| emptyTasks, |
| emptyTasks, |
| UUID.randomUUID(), |
| builder); |
| configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); |
| |
| final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); |
| |
| assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); |
| } |
| |
| @Test |
| public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances() { |
| builder.addSource(null, "source1", null, null, null, "topic1"); |
| |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); |
| |
| final Set<TaskId> activeTasks = Utils.mkSet(task0, task1); |
| final Set<TaskId> standbyTasks = Utils.mkSet(task2); |
| final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>() { |
| { |
| put(task2, Collections.singleton(t1p2)); |
| } |
| }; |
| |
| subscriptions.put( |
| "consumer1", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode() |
| ) |
| ); |
| subscriptions.put( |
| "future-consumer", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| encodeFutureSubscription() |
| ) |
| ); |
| |
| mockTaskManager( |
| allTasks, |
| allTasks, |
| UUID.randomUUID(), |
| builder); |
| partitionAssignor.configure(configProps()); |
| final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); |
| |
| assertThat(assignment.size(), equalTo(2)); |
| assertThat( |
| AssignmentInfo.decode(assignment.get("consumer1").userData()), |
| equalTo(new AssignmentInfo( |
| new ArrayList<>(activeTasks), |
| standbyTaskMap, |
| Collections.emptyMap() |
| ))); |
| assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1))); |
| |
| assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), |
| equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION))); |
| assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); |
| } |
| |
| @Test |
| public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() { |
| shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1); |
| } |
| |
| @Test |
| public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { |
| shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); |
| } |
| |
| private ByteBuffer encodeFutureSubscription() { |
| final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ |
| + 4 /* supported version */); |
| buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); |
| buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); |
| return buf; |
| } |
| |
| private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { |
| final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); |
| final Set<TaskId> emptyTasks = Collections.emptySet(); |
| subscriptions.put( |
| "consumer1", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() |
| ) |
| ); |
| subscriptions.put( |
| "future-consumer", |
| new PartitionAssignor.Subscription( |
| Collections.singletonList("topic1"), |
| encodeFutureSubscription() |
| ) |
| ); |
| |
| mockTaskManager( |
| emptyTasks, |
| emptyTasks, |
| UUID.randomUUID(), |
| builder); |
| partitionAssignor.configure(configProps()); |
| |
| try { |
| partitionAssignor.assign(metadata, subscriptions); |
| fail("Should have thrown IllegalStateException"); |
| } catch (final IllegalStateException expected) { |
| // pass |
| } |
| } |
| |
| private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) { |
| final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), |
| Collections.emptyMap(), |
| firstHostState); |
| |
| return new PartitionAssignor.Assignment( |
| Collections.emptyList(), info.encode()); |
| } |
| |
| private AssignmentInfo checkAssignment(final Set<String> expectedTopics, |
| final PartitionAssignor.Assignment assignment) { |
| |
| // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group. |
| |
| final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); |
| |
| // check if the number of assigned partitions == the size of active task id list |
| assertEquals(assignment.partitions().size(), info.activeTasks().size()); |
| |
| // check if active tasks are consistent |
| final List<TaskId> activeTasks = new ArrayList<>(); |
| final Set<String> activeTopics = new HashSet<>(); |
| for (final TopicPartition partition : assignment.partitions()) { |
| // since default grouper, taskid.partition == partition.partition() |
| activeTasks.add(new TaskId(0, partition.partition())); |
| activeTopics.add(partition.topic()); |
| } |
| assertEquals(activeTasks, info.activeTasks()); |
| |
| // check if active partitions cover all topics |
| assertEquals(expectedTopics, activeTopics); |
| |
| // check if standby tasks are consistent |
| final Set<String> standbyTopics = new HashSet<>(); |
| for (final Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) { |
| final TaskId id = entry.getKey(); |
| final Set<TopicPartition> partitions = entry.getValue(); |
| for (final TopicPartition partition : partitions) { |
| // since default grouper, taskid.partition == partition.partition() |
| assertEquals(id.partition, partition.partition()); |
| |
| standbyTopics.add(partition.topic()); |
| } |
| } |
| |
| if (info.standbyTasks().size() > 0) { |
| // check if standby partitions cover all topics |
| assertEquals(expectedTopics, standbyTopics); |
| } |
| |
| return info; |
| } |
| } |