blob: be851bff67b9ac25a8e98dacef5828ce68d1ea0d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
public class StreamPartitionAssignorTest {
private TopicPartition t1p0 = new TopicPartition("topic1", 0);
private TopicPartition t1p1 = new TopicPartition("topic1", 1);
private TopicPartition t1p2 = new TopicPartition("topic1", 2);
private TopicPartition t2p0 = new TopicPartition("topic2", 0);
private TopicPartition t2p1 = new TopicPartition("topic2", 1);
private TopicPartition t2p2 = new TopicPartition("topic2", 2);
private TopicPartition t3p0 = new TopicPartition("topic3", 0);
private TopicPartition t3p1 = new TopicPartition("topic3", 1);
private TopicPartition t3p2 = new TopicPartition("topic3", 2);
private TopicPartition t3p3 = new TopicPartition("topic3", 3);
private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
private final TaskId task0 = new TaskId(0, 0);
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
private final TaskId task3 = new TaskId(0, 3);
private Properties configProps() {
return new Properties() {
{
setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
}
};
}
private ByteArraySerializer serializer = new ByteArraySerializer();
@SuppressWarnings("unchecked")
@Test
public void testSubscription() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
final Set<TaskId> prevTasks = Utils.mkSet(
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
final Set<TaskId> cachedTasks = Utils.mkSet(
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
String clientId = "client-id";
UUID processId = UUID.randomUUID();
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
@Override
public Set<TaskId> prevTasks() {
return prevTasks;
}
@Override
public Set<TaskId> cachedTasks() {
return cachedTasks;
}
};
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
Collections.sort(subscription.topics());
assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
standbyTasks.removeAll(prevTasks);
SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
assertEquals(info.encode(), subscription.userData());
}
@Test
public void testAssignBasic() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
List<String> topics = Utils.mkList("topic1", "topic2");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partitions
assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
// the first consumer
AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
// the second consumer
AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks);
assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
// the third consumer
AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
allActiveTasks.addAll(info20.activeTasks);
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, allActiveTasks);
}
@Test
public void testAssignWithNewTasks() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
// assuming that previous tasks do not have topic3
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
// also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
// then later ones will be re-assigned to other hosts due to load balancing
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TopicPartition> allPartitions = new HashSet<>();
AssignmentInfo info;
info = AssignmentInfo.decode(assignments.get("consumer10").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer11").partitions());
info = AssignmentInfo.decode(assignments.get("consumer20").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer20").partitions());
assertEquals(allTasks, allActiveTasks);
assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
}
@Test
public void testAssignWithStates() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
List<String> topics = Utils.mkList("topic1", "topic2");
TaskId task00 = new TaskId(0, 0);
TaskId task01 = new TaskId(0, 1);
TaskId task02 = new TaskId(0, 2);
TaskId task10 = new TaskId(1, 0);
TaskId task11 = new TaskId(1, 1);
TaskId task12 = new TaskId(1, 2);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
assertEquals(2, assignments.get("consumer10").partitions().size());
assertEquals(2, assignments.get("consumer11").partitions().size());
assertEquals(2, assignments.get("consumer20").partitions().size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
// check tasks for state topics
assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
}
@Test
public void testAssignWithStandbyReplicas() throws Exception {
Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
StreamsConfig config = new StreamsConfig(props);
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
List<String> topics = Utils.mkList("topic1", "topic2");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
String client2 = "client2";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TaskId> allStandbyTasks = new HashSet<>();
// the first consumer
AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allStandbyTasks.addAll(info10.standbyTasks.keySet());
// the second consumer
AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks);
allStandbyTasks.addAll(info11.standbyTasks.keySet());
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
// the third consumer
AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
allActiveTasks.addAll(info20.activeTasks);
allStandbyTasks.addAll(info20.standbyTasks.keySet());
// all task ids are in the active tasks and also in the standby tasks
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, allActiveTasks);
assertEquals(3, allStandbyTasks.size());
assertEquals(allTasks, allStandbyTasks);
}
private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
// This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
// check if the number of assigned partitions == the size of active task id list
assertEquals(assignment.partitions().size(), info.activeTasks.size());
// check if active tasks are consistent
List<TaskId> activeTasks = new ArrayList<>();
Set<String> activeTopics = new HashSet<>();
for (TopicPartition partition : assignment.partitions()) {
// since default grouper, taskid.partition == partition.partition()
activeTasks.add(new TaskId(0, partition.partition()));
activeTopics.add(partition.topic());
}
assertEquals(activeTasks, info.activeTasks);
// check if active partitions cover all topics
assertEquals(allTopics, activeTopics);
// check if standby tasks are consistent
Set<String> standbyTopics = new HashSet<>();
for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
TaskId id = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
// since default grouper, taskid.partition == partition.partition()
assertEquals(id.partition, partition.partition());
standbyTopics.add(partition.topic());
}
}
if (info.standbyTasks.size() > 0)
// check if standby partitions cover all topics
assertEquals(allTopics, standbyTopics);
return info;
}
@Test
public void testOnAssignment() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopicPartition t2p3 = new TopicPartition("topic2", 3);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
UUID uuid = UUID.randomUUID();
String client1 = "client1";
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}
@Test
public void testAssignWithInternalTopics() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addInternalTopic("topicX");
builder.addSource("source1", "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
builder.addSink("sink1", "topicX", "processor1");
builder.addSource("source2", "topicX");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
List<String> topics = Utils.mkList("topic1", "test-topicX");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer);
partitionAssignor.setInternalTopicManager(internalTopicManager);
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check prepared internal topics
assertEquals(1, internalTopicManager.readyTopics.size());
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
}
private class MockInternalTopicManager extends InternalTopicManager {
public Map<String, Integer> readyTopics = new HashMap<>();
public MockConsumer<byte[], byte[]> restoreConsumer;
public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
super();
this.restoreConsumer = restoreConsumer;
}
@Override
public void makeReady(String topic, int numPartitions) {
readyTopics.put(topic, numPartitions);
List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
partitions.add(new PartitionInfo(topic, i, null, null, null));
}
restoreConsumer.updatePartitions(topic, partitions);
}
}
}