blob: 64f80f7963f9cb477d1da7a43018935fa9c3b0a8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static java.util.Map.Entry.comparingByKey;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsResult;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
private Logger log;
private String logPrefix;
private static class AssignedPartition implements Comparable<AssignedPartition> {
private final TaskId taskId;
private final TopicPartition partition;
AssignedPartition(final TaskId taskId, final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
public int compareTo(final AssignedPartition that) {
return, that.partition);
public boolean equals(final Object o) {
if (!(o instanceof AssignedPartition)) {
return false;
final AssignedPartition other = (AssignedPartition) o;
return compareTo(other) == 0;
public int hashCode() {
// Only partition is important for compareTo, equals and hashCode.
return partition.hashCode();
private static class ClientMetadata {
private final HostInfo hostInfo;
private final ClientState state;
private final SortedSet<String> consumers;
ClientMetadata(final String endPoint, final Map<String, String> clientTags) {
// get the host info, or null if no endpoint is configured (ie endPoint == null)
hostInfo = HostInfo.buildFromEndpoint(endPoint);
// initialize the consumer memberIds
consumers = new TreeSet<>();
// initialize the client state with client tags
state = new ClientState(clientTags);
void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
state.addOwnedPartitions(ownedPartitions, consumerMemberId);
void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
state.addPreviousTasksAndOffsetSums(consumerId, taskOffsetSums);
public String toString() {
return "ClientMetadata{" +
"hostInfo=" + hostInfo +
", consumers=" + consumers +
", state=" + state +
// keep track of any future consumers in a "dummy" Client since we can't decipher their subscription
private static final UUID FUTURE_ID = randomUUID();
protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
private String userEndPoint;
private AssignmentConfigs assignmentConfigs;
// for the main consumer, we need to use a supplier to break a cyclic setup dependency
private Supplier<Consumer<byte[], byte[]>> mainConsumerSupplier;
private Admin adminClient;
private TaskManager taskManager;
private StreamsMetadataState streamsMetadataState;
private PartitionGrouper partitionGrouper;
private AtomicInteger assignmentErrorCode;
private AtomicLong nextScheduledRebalanceMs;
private Queue<StreamsException> nonFatalExceptionsToHandle;
private Time time;
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;
private Supplier<TaskAssignor> taskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
* latter's cached metadata while sending subscriptions, and the latter needs former's returned assignment when
* adding tasks.
* @throws KafkaException if the stream thread is not specified
public void configure(final Map<String, ?> configs) {
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
adminClient = Objects.requireNonNull(referenceContainer.adminClient, "Admin client was not specified");
taskManager = Objects.requireNonNull(referenceContainer.taskManager, "TaskManager was not specified");
streamsMetadataState = Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
assignmentErrorCode = referenceContainer.assignmentErrorCode;
nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
assignmentConfigs = assignorConfiguration.assignmentConfigs();
partitionGrouper = new PartitionGrouper();
userEndPoint = assignorConfiguration.userEndPoint();
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
clientTags = referenceContainer.clientTags;
public String name() {
return "stream";
public List<RebalanceProtocol> supportedProtocols() {
final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
return supportedProtocols;
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Map from task id to its overall lag
// 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different
final Set<String> currentNamedTopologies = taskManager.topologyMetadata().namedTopologiesView();
// If using NamedTopologies, filter out any that are no longer recognized/have been removed
final Map<TaskId, Long> taskOffsetSums = taskManager.topologyMetadata().hasNamedTopologies() ?
filterMap(taskManager.getTaskOffsetSums(), t -> currentNamedTopologies.contains(t.getKey().topologyName())) :
return new SubscriptionInfo(
private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
final int errorCode) {
final Map<String, Assignment> assignment = new HashMap<>();
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {
assignment.put(consumerId, new Assignment(
return assignment;
* This assigns tasks to consumer clients in the following steps.
* 0. decode the subscriptions to assemble the metadata for each client and check for version probing
* 1. check all repartition source topics and use internal topic manager to make sure
* they have been created with the right number of partitions. Also verify and/or create
* any changelog topics with the correct number of partitions.
* 2. use the partition grouper to generate tasks along with their assigned partitions, then use
* the configured TaskAssignor to construct the mapping of tasks to clients.
* 3. construct the global mapping of host to partitions to enable query routing.
* 4. within each client, assign tasks to consumer clients.
public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) {
final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
// ---------------- Step Zero ---------------- //
// construct the client metadata from the decoded subscription info
final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
final Set<TopicPartition> allOwnedPartitions = new HashSet<>();
int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
boolean shutdownRequested = false;
boolean assignmentErrorFound = false;
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
final String consumerId = entry.getKey();
final Subscription subscription = entry.getValue();
final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
final int usedVersion = info.version();
if (info.errorCode() == AssignorError.SHUTDOWN_REQUESTED.code()) {
shutdownRequested = true;
minReceivedMetadataVersion = updateMinReceivedVersion(usedVersion, minReceivedMetadataVersion);
minSupportedMetadataVersion = updateMinSupportedVersion(info.latestSupportedVersion(), minSupportedMetadataVersion);
final UUID processId;
futureMetadataVersion = usedVersion;
processId = FUTURE_ID;
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap()));
} else {
processId = info.processId();
ClientMetadata clientMetadata = clientMetadataMap.get(processId);
// create the new client metadata if necessary
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags());
clientMetadataMap.put(info.processId(), clientMetadata);
// add the consumer and any info in its subscription to the client
clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
final int prevSize = allOwnedPartitions.size();
if (allOwnedPartitions.size() < prevSize + subscription.ownedPartitions().size()) {
assignmentErrorFound = true;
clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
if (assignmentErrorFound) {
log.warn("The previous assignment contains a partition more than once. " +
"\t Mapping: {}", subscriptions);
try {
final boolean versionProbing =
checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
// ---------------- Step One ---------------- //
if (shutdownRequested) {
return new GroupAssignment(errorAssignment(clientMetadataMap, AssignorError.SHUTDOWN_REQUESTED.code()));
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics' number of partitions
final RepartitionTopics repartitionTopics = prepareRepartitionTopics(metadata);
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();
final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
// ---------------- Step Two ---------------- //
// construct the assignment of tasks to clients
final Map<Subtopology, TopicsInfo> topicGroups =
final Set<String> allSourceTopics = new HashSet<>();
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) {
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
// get the tasks as partition groups from the partition grouper
final Map<TaskId, Set<TopicPartition>> partitionsForTask =
partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
final Set<TaskId> statefulTasks = new HashSet<>();
final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
// ---------------- Step Three ---------------- //
// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
if (minReceivedMetadataVersion >= 2) {
populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
// ---------------- Step Four ---------------- //
// compute the assignment of tasks to threads within each client and build the final group assignment
final Map<String, Assignment> assignment = computeNewAssignment(
return new GroupAssignment(assignment);
} catch (final MissingSourceTopicException e) {
log.error("Caught an error in the task assignment. Returning an error assignment.", e);
return new GroupAssignment(
errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
} catch (final TaskAssignmentException e) {
log.error("Caught an error in the task assignment. Returning an error assignment.", e);
return new GroupAssignment(
errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
* Verify the subscription versions are within the expected bounds and check for version probing.
* @return whether this was a version probing rebalance
private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
final int minSupportedMetadataVersion,
final int futureMetadataVersion) {
final boolean versionProbing;
if (futureMetadataVersion == UNKNOWN) {
versionProbing = false;
} else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
versionProbing = true;"Received a future (version probing) subscription (version: {})."
+ " Sending assignment back (with supported version {}).",
} else {
throw new TaskAssignmentException(
"Received a future (version probing) subscription (version: " + futureMetadataVersion
+ ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion
+ ") at the same time."
if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) {"Downgrade metadata to version {}. Latest supported version is {}.",
if (minSupportedMetadataVersion < LATEST_SUPPORTED_VERSION) {"Downgrade latest supported metadata to version {}. Latest supported version is {}.",
return versionProbing;
* Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
* that all user input topics of each topology have been created ahead of time. If any such source topics are
* missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
* and invoke the exception handler (without killing the thread) once for each topology to alert the user of
* the missing topics.
* <p>
* For regular applications without named topologies, the assignor will instead send a shutdown signal to
* all clients so the user can identify and resolve the problem.
* @return application metadata such as partition info of repartition topics, missing external topics, etc
private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
final RepartitionTopics repartitionTopics = new RepartitionTopics(
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
if (isMissingInputTopics) {
if (!taskManager.topologyMetadata().hasNamedTopologies()) {
throw new MissingSourceTopicException("Missing source topics.");
} else {
return repartitionTopics;
* Populates the taskForPartition and tasksForTopicGroup maps, and checks that partitions are assigned to exactly
* one task.
* @param taskForPartition a map from partition to the corresponding task. Populated here.
* @param tasksForTopicGroup a map from the topicGroupId to the set of corresponding tasks. Populated here.
* @param allSourceTopics a set of all source topics in the topology
* @param partitionsForTask a map from task to the set of input partitions
* @param fullMetadata the cluster metadata
private void populateTasksForMaps(final Map<TopicPartition, TaskId> taskForPartition,
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
final Set<String> allSourceTopics,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Cluster fullMetadata) {
// check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
final TaskId id = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
for (final TopicPartition partition : partitions) {
taskForPartition.put(partition, id);
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.topologyName()), k -> new HashSet<>()).add(id);
checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
// Logs a warning if any partitions are not assigned to a task, or a task has no assigned partitions
private void checkAllPartitions(final Set<String> allSourceTopics,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Set<TopicPartition> allAssignedPartitions,
final Cluster fullMetadata) {
for (final String topic : allSourceTopics) {
final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
if (partitionInfoList.isEmpty()) {
log.warn("No partitions found for topic {}", topic);
} else {
for (final PartitionInfo partitionInfo : partitionInfoList) {
final TopicPartition partition = new TopicPartition(partitionInfo.topic(),
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks: {}"
+ " Possible causes of a partition not getting assigned"
+ " is that another topic defined in the topology has not been"
+ " created when starting your streams application,"
+ " resulting in no tasks created for this topology at all.", partition,
* Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also
* populate the stateful tasks that have been assigned to the clients
* @return true if a probing rebalance should be triggered
private boolean assignTasksToClients(final Cluster fullMetadata,
final Set<String> allSourceTopics,
final Map<Subtopology, TopicsInfo> topicGroups,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Set<TaskId> statefulTasks) {
if (!statefulTasks.isEmpty()) {
throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
final ChangelogTopics changelogTopics = new ChangelogTopics(
final Map<UUID, ClientState> clientStates = new HashMap<>();
final boolean lagComputationSuccessful =
populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics);"{} members participating in this rebalance: \n{}.",
.map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
final Set<TaskId> allTasks = partitionsForTask.keySet();
log.debug("Assigning tasks {} including stateful {} to clients {} with number of replicas {}",
allTasks, statefulTasks, clientStates, numStandbyReplicas());
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
assignmentConfigs);"{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
.map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment())
return probingRebalanceNeeded;
private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final TaskAssignor taskAssignor = taskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
// whether or not lag computation failed.
return taskAssignor;
} else if (lagComputationSuccessful) {
return taskAssignor;
} else {"Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
+ "trigger another rebalance to retry.");
return new FallbackPriorTaskAssignor();
* Builds a map from client to state, and readies each ClientState for assignment by adding any missing prev tasks
* and computing the per-task overall lag based on the fetched end offsets for each changelog.
* @param clientStates a map from each client to its state, including offset lags. Populated by this method.
* @param clientMetadataMap a map from each client to its full metadata
* @param taskForPartition map from topic partition to its corresponding task
* @param changelogTopics object that manages changelog topics
* @return whether we were able to successfully fetch the changelog end offsets and compute each client's lag
private boolean populateClientStatesMap(final Map<UUID, ClientState> clientStates,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TopicPartition, TaskId> taskForPartition,
final ChangelogTopics changelogTopics) {
boolean fetchEndOffsetsSuccessful;
Map<TaskId, Long> allTaskEndOffsetSums;
try {
// Make the listOffsets request first so it can fetch the offsets for non-source changelogs
// asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets;
// note that we would need to wrap all exceptions as Streams exception with partition-level fine-grained
// error messages
final ListOffsetsResult endOffsetsResult =
fetchEndOffsetsResult(changelogTopics.preExistingNonSourceTopicBasedPartitions(), adminClient);
final Map<TopicPartition, Long> sourceChangelogEndOffsets =
fetchCommittedOffsets(changelogTopics.preExistingSourceTopicBasedPartitions(), mainConsumerSupplier.get());
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(
endOffsetsResult, changelogTopics.preExistingNonSourceTopicBasedPartitions());
allTaskEndOffsetSums = computeEndOffsetSumsByTask(
fetchEndOffsetsSuccessful = true;
} catch (final StreamsException | TimeoutException e) {"Failed to retrieve all end offsets for changelogs, and hence could not calculate the per-task lag; " +
"this is not a fatal error but would cause the assignor to fallback to a naive algorithm", e);
allTaskEndOffsetSums = changelogTopics.statefulTaskIds().stream().collect(Collectors.toMap(t -> t, t -> UNKNOWN_OFFSET_SUM));
fetchEndOffsetsSuccessful = false;
for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
final UUID uuid = entry.getKey();
final ClientState state = entry.getValue().state;
state.initializePrevTasks(taskForPartition, taskManager.topologyMetadata().hasNamedTopologies());
state.computeTaskLags(uuid, allTaskEndOffsetSums);
clientStates.put(uuid, state);
return fetchEndOffsetsSuccessful;
* @param endOffsets the listOffsets result from the adminClient
* @param sourceChangelogEndOffsets the end (committed) offsets of optimized source changelogs
* @param changelogTopics object that manages changelog topics
* @return Map from stateful task to its total end offset summed across all changelog partitions
private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TopicPartition, ListOffsetsResultInfo> endOffsets,
final Map<TopicPartition, Long> sourceChangelogEndOffsets,
final ChangelogTopics changelogTopics) {
final Map<TaskId, Long> taskEndOffsetSums = new HashMap<>();
for (final TaskId taskId : changelogTopics.statefulTaskIds()) {
taskEndOffsetSums.put(taskId, 0L);
for (final TopicPartition changelogPartition : changelogTopics.preExistingPartitionsFor(taskId)) {
final long changelogPartitionEndOffset;
if (sourceChangelogEndOffsets.containsKey(changelogPartition)) {
changelogPartitionEndOffset = sourceChangelogEndOffsets.get(changelogPartition);
} else if (endOffsets.containsKey(changelogPartition)) {
changelogPartitionEndOffset = endOffsets.get(changelogPartition).offset();
} else {
log.debug("Fetched offsets did not contain the changelog {} of task {}", changelogPartition, taskId);
throw new IllegalStateException("Could not get end offset for " + changelogPartition);
final long newEndOffsetSum = taskEndOffsetSums.get(taskId) + changelogPartitionEndOffset;
if (newEndOffsetSum < 0) {
taskEndOffsetSums.put(taskId, Long.MAX_VALUE);
} else {
taskEndOffsetSums.put(taskId, newEndOffsetSum);
return taskEndOffsetSums;
* Populates the global partitionsByHost and standbyPartitionsByHost maps that are sent to each member
* @param partitionsByHost a map from host to the set of partitions hosted there. Populated here.
* @param standbyPartitionsByHost a map from host to the set of standby partitions hosted there. Populated here.
* @param partitionsForTask a map from task to its set of assigned partitions
* @param clientMetadataMap a map from client to its metadata and state
private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<UUID, ClientMetadata> clientMetadataMap) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
// if application server is configured, also include host state map
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
final Set<TopicPartition> standbyPartitions = new HashSet<>();
final ClientState state = entry.getValue().state;
for (final TaskId id : state.activeTasks()) {
for (final TaskId id : state.standbyTasks()) {
partitionsByHost.put(hostInfo, topicPartitions);
standbyPartitionsByHost.put(hostInfo, standbyPartitions);
* Computes the assignment of tasks to threads within each client and assembles the final assignment to send out.
* @return the final assignment for each StreamThread consumer
private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulTasks,
final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
final Set<TopicPartition> allOwnedPartitions,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean versionProbing,
final boolean shouldTriggerProbingRebalance) {
boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing;
final Map<String, Assignment> assignment = new HashMap<>();
// within the client, distribute tasks to its owned consumers
for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientsMetadata.entrySet()) {
final UUID clientId = clientEntry.getKey();
final ClientMetadata clientMetadata = clientEntry.getValue();
final ClientState state = clientMetadata.state;
final SortedSet<String> consumers = clientMetadata.consumers;
final Map<String, Integer> threadTaskCounts = new HashMap<>();
final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads(
final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(
final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads(
// Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into
// activeTaskStatelessAssignment
final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;
for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) {
// Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
// note once we pick the first consumer within the process to trigger probing rebalance, other consumer
// would not set to trigger any more.
final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());
final boolean tasksRevoked = addClientAssignments(
if (tasksRevoked || encodeNextProbingRebalanceTime) {
rebalanceRequired = true;
log.debug("Requested client {} to schedule a followup rebalance", clientId);
}"Client {} per-consumer assignment:\n" +
"\tprev owned active {}\n" +
"\tprev owned standby {}\n" +
"\tassigned active {}\n" +
"\trevoking active {}\n" +
"\tassigned standby {}\n",
if (rebalanceRequired) {
assignmentListener.onAssignmentComplete(false);"Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
} else {
assignmentListener.onAssignmentComplete(true);"Finished stable assignment of tasks, no followup rebalances required.");
return assignment;
* Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
* @return true if a followup rebalance will be required due to revoked tasks
private boolean addClientAssignments(final Set<TaskId> statefulTasks,
final Map<String, Assignment> assignment,
final ClientMetadata clientMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
final Set<TopicPartition> allOwnedPartitions,
final Map<String, List<TaskId>> activeTaskAssignments,
final Map<String, List<TaskId>> standbyTaskAssignments,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean probingRebalanceNeeded) {
boolean followupRebalanceRequiredForRevokedTasks = false;
// We only want to encode a scheduled probing rebalance for a single member in this client
boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
// Loop through the consumers and build their assignment
for (final String consumer : clientMetadata.consumers) {
final List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer);
// These will be filled in by populateActiveTaskAndPartitionsLists below
final List<TopicPartition> activePartitionsList = new ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
final Set<TaskId> activeTasksRemovedPendingRevokation = populateActiveTaskAndPartitionsLists(
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = buildStandbyTaskMap(
final AssignmentInfo info = new AssignmentInfo(
if (!activeTasksRemovedPendingRevokation.isEmpty()) {
// TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger this rebalance"Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", consumer);
followupRebalanceRequiredForRevokedTasks = true;
// Don't bother to schedule a probing rebalance if an immediate one is already scheduled
shouldEncodeProbingRebalance = false;
} else if (shouldEncodeProbingRebalance) {
final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();"Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks.",
consumer, Utils.toLogDateTimeFormat(nextRebalanceTimeMs));
shouldEncodeProbingRebalance = false;
new Assignment(
return followupRebalanceRequiredForRevokedTasks;
* Populates the lists of active tasks and active task partitions for the consumer with a 1:1 mapping between them
* such that the nth task corresponds to the nth partition in the list. This means tasks with multiple partitions
* will be repeated in the list.
private Set<TaskId> populateActiveTaskAndPartitionsLists(final List<TopicPartition> activePartitionsList,
final List<TaskId> assignedActiveList,
final String consumer,
final ClientState clientState,
final List<TaskId> activeTasksForConsumer,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Set<TopicPartition> allOwnedPartitions) {
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
final Set<TaskId> removedActiveTasks = new TreeSet<>();
for (final TaskId taskId : activeTasksForConsumer) {
// Populate the consumer for assigned tasks without considering revocation,
// this is for debugging purposes only
clientState.assignActiveToConsumer(taskId, consumer);
final List<AssignedPartition> assignedPartitionsForTask = new ArrayList<>();
for (final TopicPartition partition : partitionsForTask.get(taskId)) {
final String oldOwner = clientState.previousOwnerForPartition(partition);
final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
// If the partition is new to this consumer but is still owned by another, remove from the assignment
// until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
"Removing task {} from {} active assignment until it is safely revoked in followup rebalance",
clientState.revokeActiveFromConsumer(taskId, consumer);
// Clear the assigned partitions list for this task if any partition can not safely be assigned,
// so as not to encode a partial task
// This has no effect on the assignment, as we'll never consult the ClientState again, but
// it does perform a useful assertion that the task was actually assigned.
} else {
assignedPartitionsForTask.add(new AssignedPartition(taskId, partition));
// assignedPartitionsForTask will either contain all partitions for the task or be empty, so just add all
// Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping
for (final AssignedPartition partition : assignedPartitions) {
return removedActiveTasks;
* @return map from task id to its assigned partitions for all standby tasks
private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String consumer,
final Iterable<TaskId> standbyTasks,
final Iterable<TaskId> revokedTasks,
final Set<TaskId> allStatefulTasks,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final ClientState clientState) {
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();
for (final TaskId task : standbyTasks) {
clientState.assignStandbyToConsumer(task, consumer);
standbyTaskMap.put(task, partitionsForTask.get(task));
for (final TaskId task : revokedTasks) {
if (allStatefulTasks.contains(task)) {"Adding removed stateful active task {} as a standby for {} before it is revoked in followup rebalance",
task, consumer);
// This has no effect on the assignment, as we'll never consult the ClientState again, but
// it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
clientState.assignStandbyToConsumer(task, consumer);
standbyTaskMap.put(task, partitionsForTask.get(task));
return standbyTaskMap;
* Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
* balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
* group id to spread subtopologies across threads and further balance the workload.
* Stateless tasks are simply spread across threads without taking into account previous ownership.
* threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby
* tasks are evenly distributed
static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
final boolean isStateful,
final SortedSet<String> consumers,
final ClientState state,
final Map<String, Integer> threadLoad) {
final Map<String, List<TaskId>> assignment = new HashMap<>();
for (final String consumer : consumers) {
assignment.put(consumer, new ArrayList<>());
final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);
final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
final Queue<String> consumersToFill = new LinkedList<>();
// keep track of tasks that we have to skip during the first pass in case we can reassign them later
// using tree-map to make sure the iteration ordering over keys are preserved
final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
if (!unassignedTasks.isEmpty()) {
// First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
for (final String consumer : consumers) {
final List<TaskId> threadAssignment = assignment.get(consumer);
// The number of tasks we have to assign here to hit minTasksPerThread
final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
if (isStateful) {
for (final TaskId task : state.prevTasksByLag(consumer)) {
if (unassignedTasks.contains(task)) {
if (threadAssignment.size() < tasksTargetCount) {
} else {
unassignedTaskToPreviousOwner.put(task, consumer);
if (threadAssignment.size() < tasksTargetCount) {
// Next interleave remaining unassigned tasks amongst unfilled consumers
while (!consumersToFill.isEmpty()) {
final TaskId task = unassignedTasks.poll();
if (task != null) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment = assignment.get(consumer);
final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
if (threadTaskCount < minTasksPerThread) {
} else {
throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
// At this point all consumers are at the min or min + 1 capacity.
// The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
// the active tasks some consumers already have min + 1 one tasks assigned.
// The tasks still remaining should now be distributed over the consumers that are still at min capacity
if (!unassignedTasks.isEmpty()) {
for (final String consumer : consumers) {
final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
if (taskCount == minTasksPerThread) {
// Go over the tasks we skipped earlier and assign them to their previous owner when possible
for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
final TaskId task = taskEntry.getKey();
final String consumer = taskEntry.getValue();
if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
// Remove this consumer since we know it is now at minCapacity + 1
// Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
for (final TaskId task : unassignedTasks) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment = assignment.get(consumer);
// Update threadLoad
for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
final String consumer = taskEntry.getKey();
final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
threadLoad.put(consumer, totalCount);
return assignment;
private void validateMetadataVersions(final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion) {
if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
log.error("Leader sent back an assignment with version {} which was greater than our used version {}",
receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion);
throw new TaskAssignmentException(
"Sent a version " + usedSubscriptionMetadataVersion
+ " subscription but got an assignment with higher version "
+ receivedAssignmentMetadataVersion + "."
if (latestCommonlySupportedVersion > LATEST_SUPPORTED_VERSION) {
log.error("Leader sent back assignment with commonly supported version {} that is greater than our "
+ "actual latest supported version {}", latestCommonlySupportedVersion, LATEST_SUPPORTED_VERSION);
throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
// Returns true if subscription version was changed, indicating version probing and need to rebalance again
protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion) {
if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
// If the latest commonly supported version is now greater than our used version, this indicates we have just
// completed the rolling upgrade and can now update our subscription version for the final rebalance
if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) {
"Sent a version {} subscription and group's latest commonly supported version is {} (successful "
"version probing and end of rolling upgrade). Upgrading subscription metadata version to " +
"{} for next rebalance.",
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
// If we received a lower version than we sent, someone else in the group still hasn't upgraded. We
// should downgrade our subscription until everyone is on the latest version
if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) {
"Sent a version {} subscription and got version {} assignment back (successful version probing). "
"Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
} else {
log.debug("Received an assignment version {} that is less than the earliest version that allows version " +
"probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.",
receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION);
return false;
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
if (info.errCode() != AssignorError.NONE.code()) {
// set flag to shutdown streams app
* latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
* receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
* usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling
* bounce upgrade with version probing.
* usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have
* been bounced and it is safe to use the latest version.
final int receivedAssignmentMetadataVersion = info.version();
final int latestCommonlySupportedVersion = info.commonlySupportedVersion();
validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks;
// version 2 fields
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo;
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
final long encodedNextScheduledRebalanceMs;
switch (receivedAssignmentMetadataVersion) {
case 1:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
partitionsByHost = Collections.emptyMap();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = Collections.emptyMap();
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
case 2:
case 3:
case 4:
case 5:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
case 6:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
case 7:
case 8:
case 9:
case 10:
case 11:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
throw new IllegalStateException(
"This code should never be reached."
+ " Please file a bug report at"
streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, topicToPartitionInfo);
// we do not capture any exceptions but just let the exception thrown from consumer.poll directly
// since when stream thread captures it, either we close all tasks as dirty or we close thread
taskManager.handleAssignment(activeTasks, info.standbyTasks());
private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebalanceMs,
final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion,
final Set<HostInfo> groupHostInfo) {
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {"Requested to schedule immediate rebalance due to version probing.");
} else if (!verifyHostInfo(groupHostInfo)) {"Requested to schedule immediate rebalance to update group with new host endpoint = {}.", userEndPoint);
} else if (encodedNextScheduledRebalanceMs == 0L) {"Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
} else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
"Requested to schedule next probing rebalance at {} to try for a more balanced assignment.",
} else {"No followup rebalance was requested, resetting the rebalance schedule.");
* Verify that this client's host info was included in the map returned in the assignment, and trigger a
* rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
* back its original assignment to avoid an unnecessary rebalance. If the client's endpoint has changed, we need
* to force a rebalance for the other members in the group to get the updated host info for this client.
* @param groupHostInfo the HostInfo of all clients in the group
* @return false if the current host info does not match that in the group assignment
private boolean verifyHostInfo(final Set<HostInfo> groupHostInfo) {
if (userEndPoint != null && !groupHostInfo.isEmpty()) {
final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);
return groupHostInfo.contains(myHostInfo);
} else {
return true;
// protected for upgrade test
protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(final List<TopicPartition> partitions, final AssignmentInfo info) {
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
for (int i = 0; i < partitions.size(); i++) {
final TopicPartition partition = partitions.get(i);
final TaskId id = info.activeTasks().get(i);
activeTasks.computeIfAbsent(id, k1 -> new HashSet<>()).add(partition);
return activeTasks;
static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
for (final Set<TopicPartition> value : partitionsByHost.values()) {
for (final TopicPartition topicPartition : value) {
new PartitionInfo(
new Node[0],
new Node[0]
return topicToPartitionInfo;
private static void validateActiveTaskEncoding(final List<TopicPartition> partitions, final AssignmentInfo info, final String logPrefix) {
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks().size()) {
throw new TaskAssignmentException(
"%sNumber of assigned partitions %d is not equal to "
+ "the number of active taskIds %d, assignmentInfo=%s",
logPrefix, partitions.size(),
info.activeTasks().size(), info
private int updateMinReceivedVersion(final int usedVersion, final int minReceivedMetadataVersion) {
return Math.min(usedVersion, minReceivedMetadataVersion);
private int updateMinSupportedVersion(final int supportedVersion, final int minSupportedMetadataVersion) {
if (supportedVersion < minSupportedMetadataVersion) {
log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
minSupportedMetadataVersion, supportedVersion);
return supportedVersion;
} else {
log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
minSupportedMetadataVersion, supportedVersion);
return minSupportedMetadataVersion;
// following functions are for test only
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
RebalanceProtocol rebalanceProtocol() {
return rebalanceProtocol;
protected String userEndPoint() {
return userEndPoint;
protected TaskManager taskManager() {
return taskManager;
protected byte uniqueField() {
return uniqueField;
protected Map<String, String> clientTags() {
return clientTags;
protected void handleRebalanceStart(final Set<String> topics) {
long acceptableRecoveryLag() {
return assignmentConfigs.acceptableRecoveryLag;
int maxWarmupReplicas() {
return assignmentConfigs.maxWarmupReplicas;
int numStandbyReplicas() {
return assignmentConfigs.numStandbyReplicas;
long probingRebalanceIntervalMs() {
return assignmentConfigs.probingRebalanceIntervalMs;