blob: d6f83df00572c32842dcf38495dcac1165a98a75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* The general uniform assignment builder is used to generate the target assignment for a consumer group with
* at least one of its members subscribed to a different set of topics.
*
* Assignments are done according to the following principles:
*
* <li> Balance: Ensure partitions are distributed equally among all members.
* The difference in assignments sizes between any two members
* should not exceed one partition. </li>
* <li> Stickiness: Minimize partition movements among members by retaining
* as much of the existing assignment as possible. </li>
*
* This assignment builder prioritizes the above properties in the following order:
* Balance > Stickiness.
*/
public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
/**
* The member metadata obtained from the assignment specification.
*/
private final Map<String, AssignmentMemberSpec> members;
/**
* The topic and partition metadata describer.
*/
private final SubscribedTopicDescriber subscribedTopicDescriber;
/**
* The list of all the topic Ids that the consumer group is subscribed to.
*/
private final Set<Uuid> subscribedTopicIds;
/**
* List of subscribed members for each topic.
*/
private final Map<Uuid, List<String>> membersPerTopic;
/**
* The new assignment that will be returned.
*/
private final Map<String, MemberAssignment> targetAssignment;
/**
* The partitions that still need to be assigned.
*/
private final Set<TopicIdPartition> unassignedPartitions;
/**
* All the partitions that have been retained from the existing assignment.
*/
private final Set<TopicIdPartition> assignedStickyPartitions;
/**
* Manages assignments to members based on their current assignment size and maximum allowed assignment size.
*/
private final AssignmentManager assignmentManager;
/**
* List of all the members sorted by their respective assignment sizes.
*/
private final TreeSet<String> sortedMembersByAssignmentSize;
/**
* Tracks the owner of each partition in the target assignment.
*/
private final Map<TopicIdPartition, String> partitionOwnerInTargetAssignment;
/**
* Handles all operations related to partition movements during a reassignment for balancing the target assignment.
*/
private final PartitionMovements partitionMovements;
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = groupSpec.members();
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
this.targetAssignment = new HashMap<>();
members.forEach((memberId, memberMetadata) ->
memberMetadata.subscribedTopicIds().forEach(topicId -> {
// Check if the subscribed topic exists.
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
}
subscribedTopicIds.add(topicId);
membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId);
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()));
})
);
this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber));
this.assignedStickyPartitions = new HashSet<>();
this.assignmentManager = new AssignmentManager(this.members, this.subscribedTopicDescriber);
this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(members.keySet());
this.partitionOwnerInTargetAssignment = new HashMap<>();
this.partitionMovements = new PartitionMovements();
}
/**
* Here's the step-by-step breakdown of the assignment process:
*
* <li> Retain partitions from the existing assignments a.k.a sticky partitions. </li>
* <li> Allocate all the remaining unassigned partitions to the members in a balanced manner.</li>
* <li> Iterate through the assignment until it is balanced. </li>
*/
@Override
protected GroupAssignment buildAssignment() {
if (subscribedTopicIds.isEmpty()) {
LOG.info("The subscription list is empty, returning an empty assignment");
return new GroupAssignment(Collections.emptyMap());
}
// All existing partitions are retained until max assignment size.
assignStickyPartitions();
unassignedPartitionsAssignment();
balance();
return new GroupAssignment(targetAssignment);
}
/**
* <li> TopicIdPartitions are sorted in descending order based on the value:
* totalPartitions/number of subscribed members. </li>
* <li> If the above value is the same then topicIdPartitions are sorted in
* ascending order of number of subscribers. </li>
* <li> If both criteria are the same, sort in ascending order of the partition Id.
* This last criteria is for predictability of the assignments. </li>
*
* @param topicIdPartitions The topic partitions that need to be sorted.
* @return A list of sorted topic partitions.
*/
private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
Comparator<TopicIdPartition> comparator = Comparator
.comparingDouble((TopicIdPartition topicIdPartition) -> {
int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size();
return (double) totalPartitions / totalSubscribers;
})
.reversed()
.thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size())
.thenComparingInt(TopicIdPartition::partitionId);
return topicIdPartitions.stream()
.sorted(comparator)
.collect(Collectors.toList());
}
/**
* Gets a set of partitions that are to be retained from the existing assignment. This includes:
* <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li>
*/
private void assignStickyPartitions() {
members.forEach((memberId, assignmentMemberSpec) ->
assignmentMemberSpec.assignedPartitions().forEach((topicId, currentAssignment) -> {
if (assignmentMemberSpec.subscribedTopicIds().contains(topicId)) {
currentAssignment.forEach(partition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
assignedStickyPartitions.add(topicIdPartition);
});
} else {
LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list");
}
})
);
}
/**
* Allocates the remaining unassigned partitions to members in a balanced manner.
* <li> Partitions are sorted to maximize the probability of a balanced assignment. </li>
* <li> Sort members in ascending order of their current target assignment sizes
* to ensure the least filled member gets the partition first. </li>
*/
private void unassignedPartitionsAssignment() {
List<TopicIdPartition> sortedPartitions = sortTopicIdPartitions(unassignedPartitions);
for (TopicIdPartition partition : sortedPartitions) {
TreeSet<String> sortedMembers = assignmentManager.sortMembersByAssignmentSize(
membersPerTopic.get(partition.topicId())
);
for (String member : sortedMembers) {
if (assignmentManager.maybeAssignPartitionToMember(partition, member)) {
break;
}
}
}
}
/**
* If a topic has two or more potential members it is subject to reassignment.
*
* @return true if the topic can participate in reassignment, false otherwise.
*/
private boolean canTopicParticipateInReassignment(Uuid topicId) {
return membersPerTopic.get(topicId).size() >= 2;
}
/**
* If a member is not assigned all its potential partitions it is subject to reassignment.
* If any of the partitions assigned to a member is subject to reassignment, the member itself
* is subject to reassignment.
*
* @return true if the member can participate in reassignment, false otherwise.
*/
private boolean canMemberParticipateInReassignment(String memberId) {
Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).targetPartitions().keySet();
int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId);
int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId);
if (currentAssignmentSize > maxAssignmentSize)
LOG.error("The member {} is assigned more partitions than the maximum possible.", memberId);
if (currentAssignmentSize < maxAssignmentSize)
return true;
for (Uuid topicId : assignedTopicIds) {
if (canTopicParticipateInReassignment(topicId))
return true;
}
return false;
}
/**
* Checks if the current assignments of partitions to members is balanced.
*
* Balance is determined by first checking if the difference in the number of partitions assigned
* to any two members is one. If this is not true, it verifies that no member can
* receive additional partitions without disrupting the balance.
*
* @return true if the assignment is balanced, false otherwise.
*/
private boolean isBalanced() {
int min = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first());
int max = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last());
// If minimum and maximum numbers of partitions assigned to members differ by at most one return true.
if (min >= max - 1)
return true;
// Ensure that members without a complete set of topic partitions cannot receive any additional partitions.
// This maintains balance. Start by checking members with the fewest assigned partitions to see if they can take more.
for (String member : sortedMembersByAssignmentSize) {
int memberPartitionCount = assignmentManager.targetAssignmentSize(member);
// Skip if this member already has all the topic partitions it can get.
int maxAssignmentSize = assignmentManager.maxAssignmentSize(member);
if (memberPartitionCount == maxAssignmentSize)
continue;
// Otherwise make sure it cannot get any more partitions.
for (Uuid topicId : members.get(member).subscribedTopicIds()) {
Set<Integer> assignedPartitions = targetAssignment.get(member).targetPartitions().get(topicId);
for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i);
if (assignedPartitions == null || !assignedPartitions.contains(i)) {
String otherMember = partitionOwnerInTargetAssignment.get(topicIdPartition);
int otherMemberPartitionCount = assignmentManager.targetAssignmentSize(otherMember);
if (memberPartitionCount + 1 < otherMemberPartitionCount) {
LOG.debug("{} can be moved from member {} to member {} for a more balanced assignment.",
topicIdPartition, otherMember, member);
return false;
}
}
}
}
}
return true;
}
/**
* Balance the current assignment after the initial round of assignments have completed.
*/
private void balance() {
if (!unassignedPartitions.isEmpty())
throw new PartitionAssignorException("Some partitions were left unassigned");
// Refill unassigned partitions with all the topicId partitions.
unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber));
// Narrow down the reassignment scope to only those partitions that can actually be reassigned.
Set<TopicIdPartition> fixedPartitions = new HashSet<>();
for (Uuid topicId : subscribedTopicIds) {
if (!canTopicParticipateInReassignment(topicId)) {
for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) {
fixedPartitions.add(new TopicIdPartition(topicId, i));
}
}
}
unassignedPartitions.removeAll(fixedPartitions);
// Narrow down the reassignment scope to only those members that are subject to reassignment.
for (String member : members.keySet()) {
if (!canMemberParticipateInReassignment(member)) {
sortedMembersByAssignmentSize.remove(member);
}
}
// If all the partitions are fixed i.e. unassigned partitions is empty there is no point of re-balancing.
if (!unassignedPartitions.isEmpty()) performReassignments();
}
/**
* Performs reassignments of partitions to balance the load across members.
* This method iteratively reassigns partitions until no further moves can improve the balance.
*
* The method uses a do-while loop to ensure at least one pass over the partitions and continues
* reassigning as long as there are modifications to the current assignments. It checks for balance
* after each reassignment and exits if the balance is achieved.
*
* @throws PartitionAssignorException if there are inconsistencies in expected members per partition
* or if a partition is expected to already be assigned but isn't.
*/
private void performReassignments() {
boolean modified;
boolean reassignmentOccurred;
// Repeat reassignment until no partition can be moved to improve the balance.
do {
modified = false;
reassignmentOccurred = false;
// Reassign all reassignable partitions sorted in descending order
// by totalPartitions/number of subscribed members,
// until the full list is processed or a balance is achieved.
List<TopicIdPartition> reassignablePartitions = sortTopicIdPartitions(unassignedPartitions);
for (TopicIdPartition reassignablePartition : reassignablePartitions) {
// Only check if there is any change in balance if any moves were made.
if (reassignmentOccurred && isBalanced()) {
return;
}
reassignmentOccurred = false;
// The topicIdPartition must have at least two members.
if (membersPerTopic.get(reassignablePartition.topicId()).size() <= 1)
throw new PartitionAssignorException(String.format("Expected more than one potential member for " +
"topicIdPartition '%s'", reassignablePartition)
);
// The topicIdPartition must have a current target owner.
String currentTargetOwner = partitionOwnerInTargetAssignment.get(reassignablePartition);
if (currentTargetOwner == null)
throw new PartitionAssignorException(String.format("Expected topicIdPartition '%s' to be assigned " +
"to a member", reassignablePartition)
);
for (String otherMember : membersPerTopic.get(reassignablePartition.topicId())) {
if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) {
reassignPartition(reassignablePartition);
modified = true;
reassignmentOccurred = true;
break;
}
}
}
} while (modified);
}
/**
* Reassigns a partition to an eligible member with the fewest current target assignments.
* <ul>
* <li> Iterates over members sorted by ascending assignment size. </li>
* <li> Selects the first member subscribed to the partition's topic. </li>
* </ul>
*
* @param partition The partition to reassign.
* @throws AssertionError If no subscribed member is found.
*/
private void reassignPartition(TopicIdPartition partition) {
// Find the new member with the least assignment size.
String newOwner = null;
for (String anotherMember : sortedMembersByAssignmentSize) {
if (members.get(anotherMember).subscribedTopicIds().contains(partition.topicId())) {
newOwner = anotherMember;
break;
}
}
if (newOwner == null) {
throw new PartitionAssignorException("No suitable new owner was found for the partition" + partition);
}
reassignPartition(partition, newOwner);
}
/**
* Reassigns the given partition to a new member while considering partition movements and stickiness.
* <p>
* This method performs the following actions:
* <ol>
* <li> Determines the current owner of the partition. </li>
* <li> Identifies the correct partition to move, adhering to stickiness constraints. </li>
* <li> Processes the partition movement to the new member. </li>
* </ol>
*
* @param partition The {@link TopicIdPartition} to be reassigned.
* @param newMember The Id of the member to which the partition should be reassigned.
*/
private void reassignPartition(TopicIdPartition partition, String newMember) {
String member = partitionOwnerInTargetAssignment.get(partition);
// Find the correct partition movement considering the stickiness requirement.
TopicIdPartition partitionToBeMoved = partitionMovements.computeActualPartitionToBeMoved(
partition,
member,
newMember
);
processPartitionMovement(partitionToBeMoved, newMember);
}
private void processPartitionMovement(TopicIdPartition topicIdPartition, String newMember) {
String oldMember = partitionOwnerInTargetAssignment.get(topicIdPartition);
partitionMovements.movePartition(topicIdPartition, oldMember, newMember);
assignmentManager.removePartitionFromTargetAssignment(topicIdPartition, oldMember);
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember);
}
/**
* This class represents a pair of member Ids involved in a partition reassignment.
* Each pair contains a source and a destination member Id.
* It normally corresponds to a particular partition or topic, and indicates that the particular partition or some
* partition of the particular topic was moved from the source member to the destination member during the rebalance.
*/
private static class MemberPair {
private final String srcMemberId;
private final String dstMemberId;
MemberPair(String srcMemberId, String dstMemberId) {
this.srcMemberId = srcMemberId;
this.dstMemberId = dstMemberId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((this.srcMemberId == null) ? 0 : this.srcMemberId.hashCode());
result = prime * result + ((this.dstMemberId == null) ? 0 : this.dstMemberId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (!getClass().isInstance(obj))
return false;
MemberPair otherPair = (MemberPair) obj;
return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId);
}
@Override
public String toString() {
return "MemberPair(" +
"srcMemberId='" + srcMemberId + '\'' +
", dstMemberId='" + dstMemberId + '\'' +
')';
}
}
/**
* This class maintains some data structures to simplify lookup of partition movements among members.
* During a partition rebalance, it keeps track of partition movements corresponding to each topic,
* and also possible movement (in form a <code>MemberPair</code> object) for each partition.
*/
private static class PartitionMovements {
private final Map<Uuid, Map<MemberPair, Set<TopicIdPartition>>> partitionMovementsByTopic = new HashMap<>();
private final Map<TopicIdPartition, MemberPair> partitionMovementsByPartition = new HashMap<>();
private MemberPair removeMovementRecordOfPartition(TopicIdPartition partition) {
MemberPair pair = partitionMovementsByPartition.remove(partition);
Uuid topic = partition.topicId();
Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
partitionMovementsForThisTopic.get(pair).remove(partition);
if (partitionMovementsForThisTopic.get(pair).isEmpty())
partitionMovementsForThisTopic.remove(pair);
if (partitionMovementsByTopic.get(topic).isEmpty())
partitionMovementsByTopic.remove(topic);
return pair;
}
private void addPartitionMovementRecord(TopicIdPartition partition, MemberPair pair) {
partitionMovementsByPartition.put(partition, pair);
Uuid topic = partition.topicId();
if (!partitionMovementsByTopic.containsKey(topic))
partitionMovementsByTopic.put(topic, new HashMap<>());
Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
if (!partitionMovementsForThisTopic.containsKey(pair))
partitionMovementsForThisTopic.put(pair, new HashSet<>());
partitionMovementsForThisTopic.get(pair).add(partition);
}
private void movePartition(TopicIdPartition partition, String oldOwner, String newOwner) {
MemberPair pair = new MemberPair(oldOwner, newOwner);
if (partitionMovementsByPartition.containsKey(partition)) {
// This partition was previously moved.
MemberPair existingPair = removeMovementRecordOfPartition(partition);
if (existingPair.dstMemberId.equals(oldOwner)) {
throw new PartitionAssignorException("Mismatch in partition movement record with respect to " +
"partition ownership during a rebalance"
);
}
if (!existingPair.srcMemberId.equals(newOwner)) {
// The partition is not moving back to its previous member.
addPartitionMovementRecord(partition, new MemberPair(existingPair.srcMemberId, newOwner));
}
} else
addPartitionMovementRecord(partition, pair);
}
/**
* Computes the actual partition to be moved based on the current and proposed partition owners.
* This method determines the appropriate partition movement, considering existing partition movements
* and constraints within a topic.
*
* @param partition The {@link TopicIdPartition} object representing the partition to be moved.
* @param oldOwner The memberId of the current owner of the partition.
* @param newOwner The memberId of the proposed new owner of the partition.
* @return The {@link TopicIdPartition} that should be moved, based on existing movement patterns
* and ownership. Returns the original partition if no specific movement pattern applies.
* @throws PartitionAssignorException if the old owner does not match the expected value for the partition.
*/
private TopicIdPartition computeActualPartitionToBeMoved(
TopicIdPartition partition,
String oldOwner,
String newOwner
) {
Uuid topic = partition.topicId();
if (!partitionMovementsByTopic.containsKey(topic))
return partition;
if (partitionMovementsByPartition.containsKey(partition)) {
String expectedOldOwner = partitionMovementsByPartition.get(partition).dstMemberId;
if (!oldOwner.equals(expectedOldOwner)) {
throw new PartitionAssignorException("Old owner does not match expected value for partition: " + partition);
}
oldOwner = partitionMovementsByPartition.get(partition).srcMemberId;
}
Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
MemberPair reversePair = new MemberPair(newOwner, oldOwner);
if (!partitionMovementsForThisTopic.containsKey(reversePair))
return partition;
return partitionMovementsForThisTopic.get(reversePair).iterator().next();
}
}
/**
* Manages assignments to members based on their current assignment size and maximum allowed assignment size.
*/
private class AssignmentManager {
private final Map<String, MemberAssignmentData> membersWithAssignmentSizes = new HashMap<>();
/**
* Represents the assignment metadata for a member.
*/
private class MemberAssignmentData {
final String memberId;
int currentAssignmentSize = 0;
int maxAssignmentSize;
/**
* Constructs a MemberAssignmentData with the given member Id.
*
* @param memberId The Id of the member.
*/
MemberAssignmentData(String memberId) {
this.memberId = memberId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberAssignmentData that = (MemberAssignmentData) o;
return memberId.equals(that.memberId);
}
@Override
public int hashCode() {
return Objects.hash(memberId);
}
@Override
public String toString() {
return "MemberAssignmentData(" +
"memberId='" + memberId + '\'' +
", currentAssignmentSize=" + currentAssignmentSize +
", maxAssignmentSize=" + maxAssignmentSize +
')';
}
}
/**
* Initializes an AssignmentManager, setting up the necessary data structures.
*/
public AssignmentManager(Map<String, AssignmentMemberSpec> members, SubscribedTopicDescriber subscribedTopicDescriber) {
members.forEach((memberId, member) -> {
int maxSize = member.subscribedTopicIds().stream()
.mapToInt(subscribedTopicDescriber::numPartitions)
.sum();
MemberAssignmentData memberAssignmentData = membersWithAssignmentSizes
.computeIfAbsent(memberId, MemberAssignmentData::new);
memberAssignmentData.maxAssignmentSize = maxSize;
memberAssignmentData.currentAssignmentSize = 0;
});
}
/**
* @param memberId The member Id.
* @return The current assignment size for the given member.
*/
private int targetAssignmentSize(String memberId) {
MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId);
if (memberData == null) {
LOG.warn("Member Id {} not found", memberId);
return 0;
}
return memberData.currentAssignmentSize;
}
/**
* @param memberId The member Id.
* @return The maximum assignment size for the given member.
*/
private int maxAssignmentSize(String memberId) {
MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId);
if (memberData == null) {
LOG.warn("Member Id {} not found", memberId);
return 0;
}
return memberData.maxAssignmentSize;
}
/**
* @param memberId The member Id.
* @return If the given member is at maximum capacity.
*/
private boolean isMemberAtMaxCapacity(String memberId) {
return targetAssignmentSize(memberId) >= maxAssignmentSize(memberId);
}
/**
* @param memberId The member Id.
* Increment the current target assignment size for the member.
*/
private void incrementTargetAssignmentSize(String memberId) {
MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId);
if (memberData == null) {
LOG.warn("Member Id {} not found", memberId);
return;
}
memberData.currentAssignmentSize++;
}
/**
* @param memberId The member Id.
* Decrement the current target assignment size for the member, if it's assignment size is greater than zero.
*/
private void decrementTargetAssignmentSize(String memberId) {
MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId);
if (memberData == null) {
LOG.warn("Member Id {} not found", memberId);
return;
}
if (memberData.currentAssignmentSize > 0) {
memberData.currentAssignmentSize--;
}
}
/**
* Assigns partition to member if eligible.
*
* @param topicIdPartition The partition to be assigned.
* @param memberId The Id of the member.
* @return true if the partition was assigned, false otherwise.
*/
private boolean maybeAssignPartitionToMember(
TopicIdPartition topicIdPartition,
String memberId
) {
// If member is not subscribed to the partition's topic, return false without assigning.
if (!members.get(memberId).subscribedTopicIds().contains(topicIdPartition.topicId())) {
return false;
}
// If the member's current assignment is already at max, return false without assigning.
if (isMemberAtMaxCapacity(memberId)) {
return false;
}
addPartitionToTargetAssignment(topicIdPartition, memberId);
return true;
}
/**
* Assigns a partition to a member, updates the current assignment size,
* and updates relevant data structures.
*
* @param topicIdPartition The partition to be assigned.
* @param memberId Member that the partition needs to be added to.
*/
private void addPartitionToTargetAssignment(TopicIdPartition topicIdPartition, String memberId) {
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
partitionOwnerInTargetAssignment.put(topicIdPartition, memberId);
// Remove the member's assignment data from the queue to update it.
sortedMembersByAssignmentSize.remove(memberId);
assignmentManager.incrementTargetAssignmentSize(memberId);
// Update current assignment size and re-add to queue if needed.
if (!isMemberAtMaxCapacity(memberId)) {
sortedMembersByAssignmentSize.add(memberId);
}
unassignedPartitions.remove(topicIdPartition);
}
/**
* Revokes the partition from a member, updates the current target assignment size,
* and other relevant data structures.
*
* @param topicIdPartition The partition to be revoked.
* @param memberId Member that the partition needs to be revoked from.
*/
private void removePartitionFromTargetAssignment(TopicIdPartition topicIdPartition, String memberId) {
Map<Uuid, Set<Integer>> targetPartitionsMap = targetAssignment.get(memberId).targetPartitions();
Set<Integer> partitionsSet = targetPartitionsMap.get(topicIdPartition.topicId());
// Remove the partition from the assignment, if there are no more partitions from a particular topic,
// remove the topic from the assignment as well.
if (partitionsSet != null) {
partitionsSet.remove(topicIdPartition.partitionId());
if (partitionsSet.isEmpty()) {
targetPartitionsMap.remove(topicIdPartition.topicId());
}
}
partitionOwnerInTargetAssignment.remove(topicIdPartition, memberId);
// Remove the member's assignment data from the set to update it.
sortedMembersByAssignmentSize.remove(memberId);
assignmentManager.decrementTargetAssignmentSize(memberId);
// Update current assignment size and re-add to set if needed.
if (!isMemberAtMaxCapacity(memberId)) {
sortedMembersByAssignmentSize.add(memberId);
}
}
/**
* Sorts members in ascending order based on their current target assignment size.
* Members that have reached their max assignment size are removed.
*
* @param memberIds Member Ids that need to be sorted.
* @return A set that maintains the order of members by assignment size.
*/
private TreeSet<String> sortMembersByAssignmentSize(Collection<String> memberIds) {
Comparator<String> comparator = Comparator
.comparingInt((String memberId) -> membersWithAssignmentSizes.get(memberId).currentAssignmentSize)
.thenComparing(memberId -> memberId);
return memberIds.stream()
.filter(memberId -> {
MemberAssignmentData memberData = membersWithAssignmentSizes.get(memberId);
return memberData.currentAssignmentSize < memberData.maxAssignmentSize;
})
.collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
}
}
}