blob: 1dd082d8cf5c1361562714082a2348bb623923ed [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
public AssignedPartition(TaskId taskId, TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
public int compareTo(AssignedPartition that) {
return, that.partition);
private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
public int compare(TopicPartition p1, TopicPartition p2) {
int result = p1.topic().compareTo(p2.topic());
if (result != 0) {
return result;
} else {
return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
private StreamThread streamThread;
private int numStandbyReplicas;
private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private InternalTopicManager internalTopicManager;
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
* since the former needs later's cached metadata while sending subscriptions,
* and the latter needs former's returned assignment when adding tasks.
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
log.error(ex.getMessage(), ex);
throw ex;
if (!(o instanceof StreamThread)) {
KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
log.error(ex.getMessage(), ex);
throw ex;
streamThread = (StreamThread) o;
this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
(String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1);
public String name() {
return "stream";
public Subscription subscription(Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
return new Subscription(new ArrayList<>(topics), data.encode());
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
// This assigns tasks to consumer clients in two steps.
// 1. using TaskAssignor to assign tasks to consumer clients.
// - Assign a task to a client which was running it previously.
// If there is no such client, assign a task to a client which has its valid local state.
// - A client may have more than one stream threads.
// The assignor tries to assign tasks to a client proportionally to the number of threads.
// - We try not to assign the same set of tasks to two different clients
// We do the assignment in one-pass. The result may not satisfy above all.
// 2. within each client, tasks are assigned to consumer clients in round-robin manner.
Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();
// decode subscription info
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
Set<String> consumers = consumersByClient.get(info.processId);
if (consumers == null) {
consumers = new HashSet<>();
consumersByClient.put(info.processId, consumers);
ClientState<TaskId> state = states.get(info.processId);
if (state == null) {
state = new ClientState<>();
states.put(info.processId, state);
state.capacity = state.capacity + 1d;
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those internal topics.
internalSourceTopicToTaskIds = new HashMap<>();
Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>();
for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics);
Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata);
// for those internal source topics that do not have co-partition enforcement,
// set the number of partitions to the maximum of the depending sub-topologies source topics
for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
Set<String> internalTopics = entry.getValue().interSourceTopics;
for (String internalTopic : internalTopics) {
Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
if (tasks == null) {
int numPartitions = -1;
for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) {
Set<String> otherSinkTopics = other.getValue().sinkTopics;
if (otherSinkTopics.contains(internalTopic)) {
for (String topic : other.getValue().sourceTopics) {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
if (infos != null && infos.size() > numPartitions)
numPartitions = infos.size();
internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
// if ZK is specified, prepare the internal source topic before calling partition grouper
if (internalTopicManager != null) {
log.debug("Starting to validate internal source topics in partition assignor.");
for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
String topic = entry.getKey();
// should have size 1 only
int numPartitions = -1;
for (TaskId task : entry.getValue()) {
numPartitions = task.partition;
internalTopicManager.makeReady(topic, numPartitions);
// wait until the topic metadata has been propagated to all brokers
List<PartitionInfo> partitions;
do {
partitions = streamThread.restoreConsumer.partitionsFor(topic);
} while (partitions == null || partitions.size() != numPartitions);
for (PartitionInfo partition : partitions)
internalPartitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
}"Completed validating internal source topics in partition assignor.");
Cluster metadataWithInternalTopics = metadata;
if (internalTopicManager != null)
metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
// get the tasks as partition groups from the partition grouper
Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
sourceTopicGroups, metadataWithInternalTopics);
// add tasks to state change log topic subscribers
stateChangelogTopicToTaskIds = new HashMap<>();
for (TaskId task : partitionsForTask.keySet()) {
for (String topicName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topicName);
if (tasks == null) {
tasks = new HashSet<>();
stateChangelogTopicToTaskIds.put(topicName, tasks);
for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
if (tasks == null) {
tasks = new HashSet<>();
internalSourceTopicToTaskIds.put(topicName, tasks);
// assign tasks to clients
states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
UUID processId = entry.getKey();
Set<String> consumers = entry.getValue();
ClientState<TaskId> state = states.get(processId);
ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
final int numActiveTasks = state.activeTasks.size();
for (TaskId taskId : state.activeTasks) {
for (TaskId id : state.assignedTasks) {
if (!state.activeTasks.contains(id))
final int numConsumers = consumers.size();
Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
int i = 0;
for (String consumer : consumers) {
ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionsForTask.get(taskId)) {
assignedPartitions.add(new AssignedPartition(taskId, partition));
} else {
Set<TopicPartition> standbyPartitions = standby.get(taskId);
if (standbyPartitions == null) {
standbyPartitions = new HashSet<>();
standby.put(taskId, standbyPartitions);
List<TaskId> active = new ArrayList<>();
List<TopicPartition> activePartitions = new ArrayList<>();
for (AssignedPartition partition : assignedPartitions) {
AssignmentInfo data = new AssignmentInfo(active, standby);
assignment.put(consumer, new Assignment(activePartitions, data.encode()));
// if ZK is specified, validate the internal source topics and the state changelog topics
if (internalTopicManager != null) {
log.debug("Starting to validate changelog topics in partition assignor.");
Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
String topic = entry.getKey();
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = 0;
for (TaskId task : entry.getValue()) {
if (numPartitions < task.partition + 1)
numPartitions = task.partition + 1;
internalTopicManager.makeReady(topic, numPartitions);
// wait until the topic metadata has been propagated to all brokers
List<PartitionInfo> partitions;
do {
partitions = streamThread.restoreConsumer.partitionsFor(topic);
} while (partitions == null || partitions.size() != numPartitions);
}"Completed validating changelog topics in partition assignor.");
return assignment;
public void onAssignment(Assignment assignment) {
List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
Collections.sort(partitions, PARTITION_COMPARATOR);
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
this.standbyTasks = info.standbyTasks;
Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
Iterator<TaskId> iter = info.activeTasks.iterator();
for (TopicPartition partition : partitions) {
Set<TaskId> taskIds = partitionToTaskIds.get(partition);
if (taskIds == null) {
taskIds = new HashSet<>();
partitionToTaskIds.put(partition, taskIds);
if (iter.hasNext()) {
} else {
TaskAssignmentException ex = new TaskAssignmentException(
"failed to find a task id for the partition=" + partition.toString() +
", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
log.error(ex.getMessage(), ex);
throw ex;
this.partitionToTaskIds = partitionToTaskIds;
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {
Set<String> internalTopics = new HashSet<>();
for (Set<String> topics : internalTopicGroups.values())
for (Set<String> copartitionGroup : copartitionGroups) {
ensureCopartitioning(copartitionGroup, internalTopics, metadata);
private void ensureCopartitioning(Set<String> copartitionGroup, Set<String> internalTopics, Cluster metadata) {
int numPartitions = -1;
for (String topic : copartitionGroup) {
if (!internalTopics.contains(topic)) {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
if (infos == null)
throw new TopologyBuilderException("External source topic not found: " + topic);
if (numPartitions == -1) {
numPartitions = infos.size();
} else if (numPartitions != infos.size()) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
// enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
for (String topic : internalTopics) {
if (copartitionGroup.contains(topic))
internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
/* For Test Only */
public Set<TaskId> tasksForState(String stateName) {
return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName));
public Set<TaskId> tasksForPartition(TopicPartition partition) {
return partitionToTaskIds.get(partition);
public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;