blob: fc7a13de261a0d9b3f4126bee48697da622eb952 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
private final static int UNKNOWN = -1;
private final static int VERSION_ONE = 1;
private final static int VERSION_TWO = 2;
private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
protected final Set<Integer> supportedVersions = new HashSet<>();
private Logger log;
private String logPrefix;
public enum Error {
private final int code;
Error(final int code) {
this.code = code;
public int code() {
return code;
public static Error fromCode(final int code) {
switch (code) {
case 0:
return NONE;
case 1:
case 2:
throw new IllegalArgumentException("Unknown error code: " + code);
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
AssignedPartition(final TaskId taskId,
final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
public int compareTo(final AssignedPartition that) {
return, that.partition);
public boolean equals(final Object o) {
if (!(o instanceof AssignedPartition)) {
return false;
final AssignedPartition other = (AssignedPartition) o;
return compareTo(other) == 0;
public int hashCode() {
// Only partition is important for compareTo, equals and hashCode.
return partition.hashCode();
private static class ClientMetadata {
final HostInfo hostInfo;
final Set<String> consumers;
final ClientState state;
ClientMetadata(final String endPoint) {
// get the host info if possible
if (endPoint != null) {
final String host = getHost(endPoint);
final Integer port = getPort(endPoint);
if (host == null || port == null) {
throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
hostInfo = new HostInfo(host, port);
} else {
hostInfo = null;
// initialize the consumer memberIds
consumers = new HashSet<>();
// initialize the client state
state = new ClientState();
void addConsumer(final String consumerMemberId,
final SubscriptionInfo info) {
state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks());
public String toString() {
return "ClientMetadata{" +
"hostInfo=" + hostInfo +
", consumers=" + consumers +
", state=" + state +
static class InternalTopicMetadata {
public final InternalTopicConfig config;
public int numPartitions;
InternalTopicMetadata(final InternalTopicConfig config) {
this.config = config;
this.numPartitions = UNKNOWN;
public String toString() {
return "InternalTopicMetadata(" +
"config=" + config +
", numPartitions=" + numPartitions +
private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) {
super(props, false);
protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = (p1, p2) -> {
final int result = p1.topic().compareTo(p2.topic());
if (result != 0) {
return result;
} else {
return, p2.partition());
private String userEndPoint;
private int numStandbyReplicas;
private TaskManager taskManager;
private PartitionGrouper partitionGrouper;
private AtomicInteger assignmentErrorCode;
protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsValidator copartitionedTopicsValidator;
protected String userEndPoint() {
return userEndPoint;
protected TaskManager taskManger() {
return taskManager;
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
* since the former needs later's cached metadata while sending subscriptions,
* and the latter needs former's returned assignment when adding tasks.
* @throws KafkaException if the stream thread is not specified
public void configure(final Map<String, ?> configs) {
final StreamsConfig streamsConfig = new InternalStreamsConfig(configs);
// Setting the logger with the passed in client thread name
logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString(CommonClientConfigs.CLIENT_ID_CONFIG));
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:"Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
usedSubscriptionMetadataVersion = VERSION_ONE;
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:"Downgrading metadata version from {} to 2 for upgrade from {}.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION, upgradeFrom);
usedSubscriptionMetadataVersion = VERSION_TWO;
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
if (o == null) {
final KafkaException fatalException = new KafkaException("TaskManager is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
if (!(o instanceof TaskManager)) {
final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
taskManager = (TaskManager) o;
final Object ai = configs.get(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
if (ai == null) {
final KafkaException fatalException = new KafkaException("assignmentErrorCode is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
if (!(ai instanceof AtomicInteger)) {
final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s",
ai.getClass().getName(), AtomicInteger.class.getName()));
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
assignmentErrorCode = (AtomicInteger) ai;
numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
partitionGrouper = streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
try {
final String host = getHost(userEndPoint);
final Integer port = getPort(userEndPoint);
if (host == null || port == null) {
throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
" but received %s",
logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
} catch (final NumberFormatException nfe) {
throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
this.userEndPoint = userEndPoint;
internalTopicManager = new InternalTopicManager(taskManager.adminClient, streamsConfig);
copartitionedTopicsValidator = new CopartitionedTopicsValidator(logPrefix);
public String name() {
return "stream";
public Subscription subscription(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
final SubscriptionInfo data = new SubscriptionInfo(
return new Subscription(new ArrayList<>(topics), data.encode());
private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
final String topic,
final int errorCode) {
log.error("{} is unknown yet during rebalance," +
" please make sure they have been pre-created before starting the Streams application.", topic);
final Map<String, Assignment> assignment = new HashMap<>();
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {
assignment.put(consumerId, new Assignment(
new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION,
return assignment;
* This assigns tasks to consumer clients in the following steps.
* 0. check all repartition source topics and use internal topic manager to make sure
* they have been created with the right number of partitions.
* 1. using user customized partition grouper to generate tasks along with their
* assigned partitions; also make sure that the task's corresponding changelog topics
* have been created with the right number of partitions.
* 2. using TaskAssignor to assign tasks to consumer clients.
* - Assign a task to a client which was running it previously.
* If there is no such client, assign a task to a client which has its valid local state.
* - A client may have more than one stream threads.
* The assignor tries to assign tasks to a client proportionally to the number of threads.
* - We try not to assign the same set of tasks to two different clients
* We do the assignment in one-pass. The result may not satisfy above all.
* 3. within each client, tasks are assigned to consumer clients in round-robin manner.
public Map<String, Assignment> assign(final Cluster metadata,
final Map<String, Subscription> subscriptions) {
// construct the client metadata from the decoded subscription info
final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
final Set<String> futureConsumers = new HashSet<>();
int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
final String consumerId = entry.getKey();
final Subscription subscription = entry.getValue();
final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
final int usedVersion = info.version();
if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
futureMetadataVersion = usedVersion;
if (usedVersion < minReceivedMetadataVersion) {
minReceivedMetadataVersion = usedVersion;
final int supportedVersion = info.latestSupportedVersion();
if (supportedVersion < minSupportedMetadataVersion) {
log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
minSupportedMetadataVersion, supportedVersion);
minSupportedMetadataVersion = supportedVersion;
} else {
log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
minSupportedMetadataVersion, supportedVersion);
// create the new client metadata if necessary
ClientMetadata clientMetadata = clientsMetadata.get(info.processId());
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint());
clientsMetadata.put(info.processId(), clientMetadata);
// add the consumer to the client
clientMetadata.addConsumer(consumerId, info);
final boolean versionProbing;
if (futureMetadataVersion != UNKNOWN) {
if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {"Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).",
versionProbing = true;
} else {
throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion
+ ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time.");
} else {
versionProbing = false;
if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {"Downgrade metadata to version {}. Latest supported version is {}.",
if (minSupportedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {"Downgrade latest supported metadata to version {}. Latest supported version is {}.",
log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
// ---------------- Step Zero ---------------- //
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics' number of partitions
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} durign assignment. Returning error {}.",
return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(, new InternalTopicMetadata(topic));
boolean numPartitionsNeeded;
do {
numPartitionsNeeded = false;
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
// if this topic is one of the sink topics of this topology,
// use the maximum of all its source topic partitions as the number of partitions
for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
final Integer numPartitionsCandidate;
// It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
} else {
numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
if (numPartitionsCandidate > numPartitions) {
numPartitions = numPartitionsCandidate;
// if we still have not find the right number of partitions,
// another iteration is needed
if (numPartitions == UNKNOWN) {
numPartitionsNeeded = true;
} else {
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
} while (numPartitionsNeeded);
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
// make sure the repartition source topics exist with the right number of partitions,
// create these topics if necessary
// augment the metadata with the newly computed number of partitions for all the
// repartition source topics
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
final int numPartitions = entry.getValue().numPartitions;
for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
// ---------------- Step One ---------------- //
// get the tasks as partition groups from the partition grouper
final Set<String> allSourceTopics = new HashSet<>();
final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
final Map<TaskId, Set<TopicPartition>> partitionsForTask = partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
// check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
final Set<TopicPartition> partitions = entry.getValue();
for (final TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
final TaskId id = entry.getKey();
tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id);
for (final String topic : allSourceTopics) {
final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
for (final PartitionInfo partitionInfo : partitionInfoList) {
final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks: {}"
+ " Possible causes of a partition not getting assigned"
+ " is that another topic defined in the topology has not been"
+ " created when starting your streams application,"
+ " resulting in no tasks created for this topology at all.", partition, partitionsForTask);
} else {
log.warn("No partitions found for topic {}", topic);
// add tasks to state change log topic subscribers
final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = UNKNOWN;
if (tasksByTopicGroup.get(topicGroupId) != null) {
for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
if (numPartitions < task.partition + 1) {
numPartitions = task.partition + 1;
final InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
topicMetadata.numPartitions = numPartitions;
changelogTopicMetadata.put(, topicMetadata);
} else {
log.debug("No tasks found for topic group {}", topicGroupId);
log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
// ---------------- Step Two ---------------- //
// assign tasks to clients
final Map<UUID, ClientState> states = new HashMap<>();
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
states.put(entry.getKey(), entry.getValue().state);
log.debug("Assigning tasks {} to clients {} with number of replicas {}",
partitionsForTask.keySet(), states, numStandbyReplicas);
final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet());
taskAssignor.assign(numStandbyReplicas);"Assigned tasks to clients as {}.", states);
// ---------------- Step Three ---------------- //
// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
if (minReceivedMetadataVersion >= 2) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
final ClientState state = entry.getValue().state;
for (final TaskId id : state.activeTasks()) {
partitionsByHostState.put(hostInfo, topicPartitions);
final Map<String, Assignment> assignment;
if (versionProbing) {
assignment = versionProbingAssignment(
} else {
assignment = computeNewAssignment(
return assignment;
private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion) {
final Map<String, Assignment> assignment = new HashMap<>();
// within the client, distribute tasks to its owned consumers
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final Set<String> consumers = entry.getValue().consumers;
final ClientState state = entry.getValue().state;
final List<List<TaskId>> interleavedActive = interleaveTasksByGroupId(state.activeTasks(), consumers.size());
final List<List<TaskId>> interleavedStandby = interleaveTasksByGroupId(state.standbyTasks(), consumers.size());
int consumerTaskIndex = 0;
for (final String consumer : consumers) {
final List<TaskId> activeTasks = interleavedActive.get(consumerTaskIndex);
// These will be filled in by buildAssignedActiveTaskAndPartitionsList below
final List<TopicPartition> activePartitionsList = new ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask);
final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
if (!state.standbyTasks().isEmpty()) {
final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
for (final TaskId taskId : assignedStandbyList) {
standby.computeIfAbsent(taskId, k -> new HashSet<>()).addAll(partitionsForTask.get(taskId));
// finally, encode the assignment before sending back to coordinator
new Assignment(
new AssignmentInfo(
return assignment;
private static Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Set<String> futureConsumers,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion) {
final Map<String, Assignment> assignment = new HashMap<>();
// assign previously assigned tasks to "old consumers"
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {
if (futureConsumers.contains(consumerId)) {
// Return the same active tasks that were claimed in the subscription
final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId));
// These will be filled in by buildAssignedActiveTaskAndPartitionsList below
final List<TopicPartition> activePartitionsList = new ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask);
// Return the same standby tasks that were claimed in the subscription
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
for (final TaskId taskId : clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) {
standbyTasks.put(taskId, partitionsForTask.get(taskId));
assignment.put(consumerId, new Assignment(
new AssignmentInfo(
// add empty assignment for "future version" clients (ie, empty version probing response)
for (final String consumerId : futureConsumers) {
assignment.put(consumerId, new Assignment(
new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion).encode()
return assignment;
private static void buildAssignedActiveTaskAndPartitionsList(final List<TaskId> activeTasks,
final List<TopicPartition> activePartitionsList,
final List<TaskId> assignedActiveList,
final Map<TaskId, Set<TopicPartition>> partitionsForTask) {
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
// Build up list of all assigned partition-task pairs
for (final TaskId taskId : activeTasks) {
for (final TopicPartition partition : partitionsForTask.get(taskId)) {
assignedPartitions.add(new AssignedPartition(taskId, partition));
// Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping
for (final AssignedPartition partition : assignedPartitions) {
// visible for testing
List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) {
final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads);
for (int i = 0; i < numberThreads; i++) {
taskIdsForConsumerAssignment.add(new ArrayList<>());
while (!sortedTasks.isEmpty()) {
for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) {
final TaskId taskId = sortedTasks.poll();
if (taskId == null) {
return taskIdsForConsumerAssignment;
private void validateMetadataVersions(final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion) {
if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
log.error("Leader sent back an assignment with version {} which was greater than our used version {}",
receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion);
throw new TaskAssignmentException(
"Sent a version " + usedSubscriptionMetadataVersion
+ " subscription but got an assignment with higher version "
+ receivedAssignmentMetadataVersion + "."
if (latestCommonlySupportedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
log.error("Leader sent back assignment with commonly supported version {} that is greater than our "
+ "actual latest supported version {}", latestCommonlySupportedVersion,
throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
// Returns true if subscription version was changed, indicating version probing and need to rebalance again
protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion) {
if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
// If the latest commonly supported version is now greater than our used version, this indicates we have just
// completed the rolling upgrade and can now update our subscription version for the final rebalance
if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) {
"Sent a version {} subscription and group's latest commonly supported version is {} (successful "
"version probing and end of rolling upgrade). Upgrading subscription metadata version to " +
"{} for next rebalance.",
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
// If we received a lower version than we sent, someone else in the group still hasn't upgraded. We
// should downgrade our subscription until everyone is on the latest version
if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) {
"Sent a version {} subscription and got version {} assignment back (successful version probing). "
"Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
} else {
log.debug("Received an assignment version {} that is less than the earliest version that allows version " +
"probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.",
receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION);
return false;
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
public void onAssignment(final Assignment assignment) {
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
if (info.errCode() != Error.NONE.code) {
// set flag to shutdown streams app
* latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
* receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
* usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling
* bounce upgrade with version probing.
* usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have
* been bounced and it is safe to use the latest version.
final int receivedAssignmentMetadataVersion = info.version();
final int latestCommonlySupportedVersion = info.commonlySupportedVersion();
validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
// Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
// version 2 fields
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
switch (receivedAssignmentMetadataVersion) {
processVersionOneAssignment(info, partitions, activeTasks);
partitionsByHost = Collections.emptyMap();
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
throw new IllegalStateException("This code should never be reached. Please file a bug report at");
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
private void processVersionOneAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks) {
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks().size()) {
throw new TaskAssignmentException(
String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString())
for (int i = 0; i < partitions.size(); i++) {
final TopicPartition partition = partitions.get(i);
final TaskId id = info.activeTasks().get(i);
activeTasks.computeIfAbsent(id, k -> new HashSet<>()).add(partition);
private void processVersionTwoAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionOneAssignment(info, partitions, activeTasks);
// process partitions by host
final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
for (final Set<TopicPartition> value : partitionsByHost.values()) {
for (final TopicPartition topicPartition : value) {
new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
// for testing
protected void processLatestVersionAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
* Internal helper function that creates a Kafka topic
* @param topicPartitions Map that contains the topic names to be created with the number of partitions
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions);
// first construct the topics to make ready
final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>();
for (final InternalTopicMetadata metadata : topicPartitions.values()) {
final InternalTopicConfig topic = metadata.config;
final int numPartitions = metadata.numPartitions;
if (numPartitions < 0) {
throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix,;
topicsToMakeReady.put(, topic);
if (!topicsToMakeReady.isEmpty()) {
log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions);
private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {
for (final Set<String> copartitionGroup : copartitionGroups) {
copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
static class CopartitionedTopicsValidator {
private final String logPrefix;
private final Logger log;
CopartitionedTopicsValidator(final String logPrefix) {
this.logPrefix = logPrefix;
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
void validate(final Set<String> copartitionGroup,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {
int numPartitions = UNKNOWN;
for (final String topic : copartitionGroup) {
if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
final Integer partitions = metadata.partitionCountForTopic(topic);
if (partitions == null) {
final String str = String.format("%sTopic not found: %s", logPrefix, topic);
throw new IllegalStateException(str);
if (numPartitions == UNKNOWN) {
numPartitions = partitions;
} else if (numPartitions != partitions) {
final String[] topics = copartitionGroup.toArray(new String[0]);
throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
// if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == UNKNOWN) {
for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
final int partitions = entry.getValue().numPartitions;
if (partitions > numPartitions) {
numPartitions = partitions;
// enforce co-partitioning restrictions to repartition topics by updating their number of partitions
for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
entry.getValue().numPartitions = numPartitions;
protected void setAssignmentErrorCode(final Integer errorCode) {
// following functions are for test only
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;