KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor.
Trunk:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 24.535 ± 1.583 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 24.094 ± 0.223 ms/op
JMH benchmarks done
```
```
Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 100 avgt 5 14.697 ± 0.133 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 1000 avgt 5 15.073 ± 0.135 ms/op
JMH benchmarks done
```
Patch:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.376 ± 0.577 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.731 ± 0.359 ms/op
JMH benchmarks done
```
```
Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 100 avgt 5 1.975 ± 0.086 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 1000 avgt 5 2.026 ± 0.190 ms/op
JMH benchmarks done
```
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
index 3ea1361..34e8256 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
@@ -18,29 +18,18 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
/**
- * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with
* all its members subscribed to the same set of topics.
- * It is optimized since the assignment can be done in fewer, less complicated steps compared to when
- * the subscriptions are different across the members.
*
* Assignments are done according to the following principles:
*
@@ -53,8 +42,17 @@
* The assignment builder prioritizes the properties in the following order:
* Balance > Stickiness.
*/
-public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder {
- private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {
+ private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass();
+ private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();
+
+ /**
+ * @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not
+ * public hence we cannot use the `instanceof` operator.
+ */
+ private static boolean isImmutableMap(Map<?, ?> map) {
+ return UNMODIFIABLE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map);
+ }
/**
* The assignment specification which includes member metadata.
@@ -72,62 +70,53 @@
private final Set<Uuid> subscribedTopicIds;
/**
- * The number of members to receive an extra partition beyond the minimum quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition.
+ * The members that are below their quota.
*/
- private int remainingMembersToGetAnExtraPartition;
-
- /**
- * Members mapped to the remaining number of partitions needed to meet the minimum quota.
- * Minimum quota = total partitions / total members.
- */
- private Map<String, Integer> potentiallyUnfilledMembers;
+ private final List<MemberWithRemainingQuota> unfilledMembers;
/**
* The partitions that still need to be assigned.
* Initially this contains all the subscribed topics' partitions.
*/
- private final Set<TopicIdPartition> unassignedPartitions;
+ private final List<TopicIdPartition> unassignedPartitions;
/**
* The target assignment.
*/
private final Map<String, MemberAssignment> targetAssignment;
+ /**
+ * The minimum number of partitions that a member must have.
+ * Minimum quota = total partitions / total members.
+ */
+ private int minimumMemberQuota;
+
+ /**
+ * The number of members to receive an extra partition beyond the minimum quota.
+ * Example: If there are 11 partitions to be distributed among 3 members,
+ * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition.
+ */
+ private int remainingMembersToGetAnExtraPartition;
+
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
- this.potentiallyUnfilledMembers = new HashMap<>();
- this.unassignedPartitions = new HashSet<>();
+ this.unfilledMembers = new ArrayList<>();
+ this.unassignedPartitions = new ArrayList<>();
this.targetAssignment = new HashMap<>();
}
/**
- * Here's the step-by-step breakdown of the assignment process:
- *
- * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
- * <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
- * current target assignment.</li>
- * <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
- * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
- * <li> Proceed with a round-robin assignment according to quotas.
- * For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
+ * Compute the new assignment for the group.
*/
- @Override
- protected GroupAssignment buildAssignment() throws PartitionAssignorException {
- int totalPartitionsCount = 0;
-
+ public GroupAssignment build() throws PartitionAssignorException {
if (subscribedTopicIds.isEmpty()) {
- LOG.debug("The subscription list is empty, returning an empty assignment");
return new GroupAssignment(Collections.emptyMap());
}
- // Check if the subscribed topicId is still valid.
- // Update unassigned partitions based on the current target assignment
- // and topic metadata.
+ // Compute the list of unassigned partitions.
+ int totalPartitionsCount = 0;
for (Uuid topicId : subscribedTopicIds) {
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
@@ -144,216 +133,149 @@
}
}
- // The minimum required quota that each member needs to meet for a balanced assignment.
- // This is the same for all members.
- final int numberOfMembers = groupSpec.members().size();
- final int minQuota = totalPartitionsCount / numberOfMembers;
+ // Compute the minimum required quota per member and the number of members
+ // that should receive an extra partition.
+ int numberOfMembers = groupSpec.members().size();
+ minimumMemberQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
- groupSpec.members().keySet().forEach(memberId ->
- targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
- ));
+ // Revoke the partitions that either are not part of the member's subscriptions or
+ // exceed the maximum quota assigned to each member.
+ maybeRevokePartitions();
- potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
- unassignedPartitionsRoundRobinAssignment();
-
- if (!unassignedPartitions.isEmpty()) {
- throw new PartitionAssignorException("Partitions were left unassigned");
- }
+ // Assign the unassigned partitions to the members with space.
+ assignRemainingPartitions();
return new GroupAssignment(targetAssignment);
}
/**
- * Retains a set of partitions from the existing assignment and includes them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and subscriptions are considered.
+ * Revoke the partitions that either are not part of the member's subscriptions or
+ * exceed the maximum quota assigned to each member.
*
- * <p> For each member:
- * <ol>
- * <li> Find the valid current assignment considering topic subscriptions and metadata</li>
- * <li> If the current assignment exists, retain partitions up to the minimum quota.</li>
- * <li> If the current assignment size is greater than the minimum quota and
- * there are members that could get an extra partition, assign the next partition as well.</li>
- * <li> Finally, if the member's current assignment size is less than the minimum quota,
- * add them to the potentially unfilled members map and track the number of remaining
- * partitions required to meet the quota.</li>
- * </ol>
- * </p>
- *
- * @return Members mapped to the remaining number of partitions needed to meet the minimum quota,
- * including members that are eligible to receive an extra partition.
+ * This method ensures that the original assignment is not copied if it is not
+ * altered.
*/
- private Map<String, Integer> assignStickyPartitions(int minQuota) {
- Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
+ private void maybeRevokePartitions() {
+ for (Map.Entry<String, AssignmentMemberSpec> entry : groupSpec.members().entrySet()) {
+ String memberId = entry.getKey();
+ AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+ Map<Uuid, Set<Integer>> oldAssignment = assignmentMemberSpec.assignedPartitions();
+ Map<Uuid, Set<Integer>> newAssignment = null;
- groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
- List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
- assignmentMemberSpec.assignedPartitions()
- );
+ // The assignor expects to receive the assignment as an immutable map. It leverages
+ // this knowledge in order to avoid having to copy all assignments.
+ if (!isImmutableMap(oldAssignment)) {
+ throw new IllegalStateException("The assignor expect an immutable map.");
+ }
- int currentAssignmentSize = validCurrentMemberAssignment.size();
- // Number of partitions required to meet the minimum quota.
- int remaining = minQuota - currentAssignmentSize;
+ int quota = minimumMemberQuota;
+ if (remainingMembersToGetAnExtraPartition > 0) {
+ quota++;
+ remainingMembersToGetAnExtraPartition--;
+ }
- if (currentAssignmentSize > 0) {
- int retainedPartitionsCount = min(currentAssignmentSize, minQuota);
- IntStream.range(0, retainedPartitionsCount).forEach(i -> {
- TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- });
+ for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) {
+ Uuid topicId = topicPartitions.getKey();
+ Set<Integer> partitions = topicPartitions.getValue();
- if (remaining < 0) {
- // The extra partition is located at the last index from the previous step.
- if (remainingMembersToGetAnExtraPartition > 0) {
- TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- remainingMembersToGetAnExtraPartition--;
+ if (subscribedTopicIds.contains(topicId)) {
+ if (partitions.size() <= quota) {
+ quota -= partitions.size();
+ } else {
+ for (Integer partition : partitions) {
+ if (quota > 0) {
+ quota--;
+ } else {
+ if (newAssignment == null) {
+ // If the new assignment is null, we create a deep copy of the
+ // original assignment so that we can alter it.
+ newAssignment = deepCopy(oldAssignment);
+ }
+ // Remove the partition from the new assignment.
+ Set<Integer> parts = newAssignment.get(topicId);
+ parts.remove(partition);
+ if (parts.isEmpty()) {
+ newAssignment.remove(topicId);
+ }
+ // Add the partition to the unassigned set to be re-assigned later on.
+ unassignedPartitions.add(new TopicIdPartition(topicId, partition));
+ }
+ }
}
- // Any previously owned partitions that weren't retained due to the quotas
- // are added to the unassigned partitions set.
- if (retainedPartitionsCount < currentAssignmentSize) {
- unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
- retainedPartitionsCount,
- currentAssignmentSize
- ));
+ } else {
+ if (newAssignment == null) {
+ // If the new assignment is null, we create a deep copy of the
+ // original assignment so that we can alter it.
+ newAssignment = deepCopy(oldAssignment);
}
+ // Remove the entire topic.
+ newAssignment.remove(topicId);
}
}
- if (remaining >= 0) {
- potentiallyUnfilledMembers.put(memberId, remaining);
+ if (quota > 0) {
+ unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota));
}
- });
- return potentiallyUnfilledMembers;
- }
-
- /**
- * Filters the current assignment of partitions for a given member based on certain criteria.
- *
- * Any partition that still belongs to the member's subscribed topics list is considered valid.
- *
- * @param currentMemberAssignment The map of topics to partitions currently assigned to the member.
- *
- * @return List of valid partitions after applying the filters.
- */
- private List<TopicIdPartition> validCurrentMemberAssignment(
- Map<Uuid, Set<Integer>> currentMemberAssignment
- ) {
- List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
- currentMemberAssignment.forEach((topicId, partitions) -> {
- if (subscribedTopicIds.contains(topicId)) {
- partitions.forEach(partition -> {
- TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
- validCurrentAssignmentList.add(topicIdPartition);
- });
+ if (newAssignment == null) {
+ targetAssignment.put(memberId, new MemberAssignment(oldAssignment));
} else {
- LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list");
- }
- });
-
- return validCurrentAssignmentList;
- }
-
- /**
- * Allocates the unassigned partitions to unfilled members in a round-robin fashion.
- */
- private void unassignedPartitionsRoundRobinAssignment() {
- Queue<String> roundRobinMembers = new LinkedList<>(potentiallyUnfilledMembers.keySet());
-
- // Partitions are sorted to ensure an even topic wise distribution across members.
- // This not only balances the load but also makes partition-to-member mapping more predictable.
- List<TopicIdPartition> sortedPartitionsList = unassignedPartitions.stream()
- .sorted(Comparator.comparing(TopicIdPartition::topicId).thenComparing(TopicIdPartition::partitionId))
- .collect(Collectors.toList());
-
- for (TopicIdPartition topicIdPartition : sortedPartitionsList) {
- boolean assigned = false;
-
- for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
- String memberId = roundRobinMembers.poll();
- if (potentiallyUnfilledMembers.containsKey(memberId)) {
- assigned = maybeAssignPartitionToMember(memberId, topicIdPartition);
- }
- // Only re-add the member to the end of the queue if it's still available for assignment.
- if (potentiallyUnfilledMembers.containsKey(memberId)) {
- roundRobinMembers.add(memberId);
- }
- }
-
- if (assigned) {
- unassignedPartitions.remove(topicIdPartition);
+ targetAssignment.put(memberId, new MemberAssignment(newAssignment));
}
}
}
/**
- * Assigns the specified partition to the given member and updates the potentially unfilled members map.
- * Only assign extra partitions once the member has met its minimum quota = total partitions / total members.
- *
- * <ol>
- * <li> If the minimum quota hasn't been met aka remaining > 0 directly assign the partition.
- * After assigning the partition, if the min quota has been met aka remaining = 0, remove the member
- * if there's no members left to receive an extra partition. Otherwise, keep it in the
- * potentially unfilled map. </li>
- * <li> If the minimum quota has been met and if there is potential to receive an extra partition, assign it.
- * Remove the member from the potentially unfilled map since it has already received the extra partition
- * and met the min quota. </li>
- * <li> Else, don't assign the partition. </li>
- * </ol>
- *
- * @param memberId The Id of the member to which the partition will be assigned.
- * @param topicIdPartition The topicIdPartition to be assigned.
- * @return true if the assignment was successful, false otherwise.
+ * Assign the unassigned partitions to the unfilled members.
*/
- private boolean maybeAssignPartitionToMember(String memberId, TopicIdPartition topicIdPartition) {
- int remaining = potentiallyUnfilledMembers.get(memberId);
- boolean shouldAssign = false;
+ private void assignRemainingPartitions() {
+ int unassignedPartitionIndex = 0;
- // If the member hasn't met the minimum quota, set the flag for assignment.
- // If member has met minimum quota and there's an extra partition available, set the flag for assignment.
- if (remaining > 0) {
- potentiallyUnfilledMembers.put(memberId, --remaining);
- shouldAssign = true;
+ for (MemberWithRemainingQuota unfilledMember : unfilledMembers) {
+ String memberId = unfilledMember.memberId;
+ int remainingQuota = unfilledMember.remainingQuota;
- // If the member meets the minimum quota due to this assignment,
- // check if any extra partitions are available.
- // Removing the member from the list reduces an iteration for when remaining = 0 but there's no extras left.
- if (remaining == 0 && remainingMembersToGetAnExtraPartition == 0) {
- potentiallyUnfilledMembers.remove(memberId);
+ Map<Uuid, Set<Integer>> newAssignment = targetAssignment.get(memberId).targetPartitions();
+ if (isImmutableMap(newAssignment)) {
+ // If the new assignment is immutable, we must create a deep copy of it
+ // before altering it.
+ newAssignment = deepCopy(newAssignment);
+ targetAssignment.put(memberId, new MemberAssignment(newAssignment));
}
- } else if (remaining == 0 && remainingMembersToGetAnExtraPartition > 0) {
- remainingMembersToGetAnExtraPartition--;
- // Each member can only receive one extra partition, once they meet the minimum quota and receive an extra
- // partition they can be removed from the potentially unfilled members map.
- potentiallyUnfilledMembers.remove(memberId);
- shouldAssign = true;
+
+ for (int i = 0; i < remainingQuota && unassignedPartitionIndex < unassignedPartitions.size(); i++) {
+ TopicIdPartition unassignedTopicIdPartition = unassignedPartitions.get(unassignedPartitionIndex);
+ unassignedPartitionIndex++;
+ newAssignment
+ .computeIfAbsent(unassignedTopicIdPartition.topicId(), __ -> new HashSet<>())
+ .add(unassignedTopicIdPartition.partitionId());
+ }
}
- // Assign the partition if flag is set.
- if (shouldAssign) {
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- return true;
+ if (unassignedPartitionIndex < unassignedPartitions.size()) {
+ throw new PartitionAssignorException("Partitions were left unassigned");
}
+ }
- // No assignment possible because the member met the minimum quota but
- // number of members to receive an extra partition is zero.
- return false;
+ private static Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> map) {
+ Map<Uuid, Set<Integer>> copy = new HashMap<>(map.size());
+ for (Map.Entry<Uuid, Set<Integer>> entry : map.entrySet()) {
+ copy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+ return copy;
+ }
+
+ private static class MemberWithRemainingQuota {
+ final String memberId;
+ final int remainingQuota;
+
+ MemberWithRemainingQuota(
+ String memberId,
+ int remainingQuota
+ ) {
+ this.memberId = memberId;
+ this.remainingQuota = remainingQuota;
+ }
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
index 7da7c2d..648b016 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
@@ -66,21 +66,19 @@
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
- AbstractUniformAssignmentBuilder assignmentBuilder;
-
if (groupSpec.members().isEmpty())
return new GroupAssignment(Collections.emptyMap());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm");
- assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+ return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
+ .build();
} else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm");
- assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+ return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
+ .buildAssignment();
}
-
- return assignmentBuilder.buildAssignment();
}
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 74bb303..ffc5455 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -22,12 +22,13 @@
import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -42,13 +43,13 @@
);
}
- public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment(
+ public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment(
Uuid topicId,
Integer... partitions
) {
return new AbstractMap.SimpleEntry<>(
topicId,
- new TreeSet<>(Arrays.asList(partitions))
+ new LinkedHashSet<>(Arrays.asList(partitions))
);
}
@@ -56,18 +57,18 @@
public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
- assignment.put(entry.getKey(), entry.getValue());
+ assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
}
- return assignment;
+ return Collections.unmodifiableMap(assignment);
}
@SafeVarargs
- public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
+ public static Map<Uuid, Set<Integer>> mkOrderedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
- assignment.put(entry.getKey(), entry.getValue());
+ assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
}
- return assignment;
+ return Collections.unmodifiableMap(assignment);
}
/**
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
index 77bad7a..900ba31 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
@@ -69,8 +69,8 @@
import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
-import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
-import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord;
@@ -297,7 +297,7 @@
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
- Map<Uuid, Set<Integer>> partitions = mkSortedAssignment(
+ Map<Uuid, Set<Integer>> partitions = mkOrderedAssignment(
mkTopicAssignment(topicId1, 11, 12, 13),
mkTopicAssignment(topicId2, 21, 22, 23)
);
@@ -379,14 +379,14 @@
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
- Map<Uuid, Set<Integer>> assigned = mkSortedAssignment(
- mkSortedTopicAssignment(topicId1, 11, 12, 13),
- mkSortedTopicAssignment(topicId2, 21, 22, 23)
+ Map<Uuid, Set<Integer>> assigned = mkOrderedAssignment(
+ mkOrderedTopicAssignment(topicId1, 11, 12, 13),
+ mkOrderedTopicAssignment(topicId2, 21, 22, 23)
);
- Map<Uuid, Set<Integer>> revoking = mkSortedAssignment(
- mkSortedTopicAssignment(topicId1, 14, 15, 16),
- mkSortedTopicAssignment(topicId2, 24, 25, 26)
+ Map<Uuid, Set<Integer>> revoking = mkOrderedAssignment(
+ mkOrderedTopicAssignment(topicId1, 14, 15, 16),
+ mkOrderedTopicAssignment(topicId2, 24, 25, 26)
);
CoordinatorRecord expectedRecord = new CoordinatorRecord(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index f21bd63..fdc4f59 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -34,6 +34,7 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
@@ -158,12 +159,11 @@
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 2),
- mkTopicAssignment(topic3Uuid, 1)
+ mkTopicAssignment(topic1Uuid, 0),
+ mkTopicAssignment(topic3Uuid, 0, 1)
));
expectedAssignment.put(memberB, mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic3Uuid, 0)
+ mkTopicAssignment(topic1Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
@@ -295,30 +295,25 @@
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
- Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
- mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 1),
- mkTopicAssignment(topic2Uuid, 0, 1)
- )
- );
+
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForA
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 0, 1),
+ mkTopicAssignment(topic2Uuid, 0, 1)
+ )
));
- Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
- mkAssignment(
- mkTopicAssignment(topic1Uuid, 2),
- mkTopicAssignment(topic2Uuid, 2)
- )
- );
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForB
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 2),
+ mkTopicAssignment(topic2Uuid, 2)
+ )
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@@ -366,40 +361,34 @@
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
- Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
- mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 2),
- mkTopicAssignment(topic2Uuid, 0)
- )
- );
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForA
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 0, 2),
+ mkTopicAssignment(topic2Uuid, 0)
+ )
));
- Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
- mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic2Uuid, 1, 2)
- )
- );
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForB
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 1),
+ mkTopicAssignment(topic2Uuid, 1, 2)
+ )
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
- mkTopicAssignment(topic2Uuid, 0, 4)
+ mkTopicAssignment(topic1Uuid, 0, 2, 3),
+ mkTopicAssignment(topic2Uuid, 0, 3, 4)
));
expectedAssignment.put(memberB, mkAssignment(
- mkTopicAssignment(topic1Uuid, 1, 4),
- mkTopicAssignment(topic2Uuid, 1, 2, 3)
+ mkTopicAssignment(topic1Uuid, 1, 4, 5),
+ mkTopicAssignment(topic2Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
@@ -436,26 +425,24 @@
Map<String, AssignmentMemberSpec> members = new HashMap<>();
- Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 2),
- mkTopicAssignment(topic2Uuid, 0)
- ));
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForA
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 0, 2),
+ mkTopicAssignment(topic2Uuid, 0)
+ )
));
- Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic2Uuid, 1, 2)
- ));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForB
+ mkOrderedAssignment(
+ mkTopicAssignment(topic1Uuid, 1),
+ mkTopicAssignment(topic2Uuid, 1, 2)
+ )
));
// Add a new member to trigger a re-assignment.
@@ -512,38 +499,36 @@
Map<String, AssignmentMemberSpec> members = new HashMap<>();
- Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
- mkTopicAssignment(topic1Uuid, 0),
- mkTopicAssignment(topic2Uuid, 0)
- );
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForA
+ mkAssignment(
+ mkTopicAssignment(topic1Uuid, 0),
+ mkTopicAssignment(topic2Uuid, 0)
+ )
));
- Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic2Uuid, 1)
- );
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
- currentAssignmentForB
+ mkAssignment(
+ mkTopicAssignment(topic1Uuid, 1),
+ mkTopicAssignment(topic2Uuid, 1)
+ )
));
// Member C was removed
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
- mkTopicAssignment(topic1Uuid, 0, 2),
- mkTopicAssignment(topic2Uuid, 0)
+ mkTopicAssignment(topic1Uuid, 0),
+ mkTopicAssignment(topic2Uuid, 0, 2)
));
expectedAssignment.put(memberB, mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic2Uuid, 1, 2)
+ mkTopicAssignment(topic1Uuid, 1, 2),
+ mkTopicAssignment(topic2Uuid, 1)
));
GroupSpec groupSpec = new GroupSpecImpl(
@@ -581,26 +566,24 @@
// Initial subscriptions were [T1, T2]
Map<String, AssignmentMemberSpec> members = new HashMap<>();
- Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
- mkTopicAssignment(topic1Uuid, 0),
- mkTopicAssignment(topic2Uuid, 0)
- );
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
- currentAssignmentForA
+ mkAssignment(
+ mkTopicAssignment(topic1Uuid, 0),
+ mkTopicAssignment(topic2Uuid, 0)
+ )
));
- Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
- mkTopicAssignment(topic1Uuid, 1),
- mkTopicAssignment(topic2Uuid, 1)
- );
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
- currentAssignmentForB
+ mkAssignment(
+ mkTopicAssignment(topic1Uuid, 1),
+ mkTopicAssignment(topic2Uuid, 1)
+ )
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 2349350..77a38ab 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -268,7 +268,7 @@
assignmentMemberSpec.instanceId(),
assignmentMemberSpec.rackId(),
assignmentMemberSpec.subscribedTopicIds(),
- memberAssignment.targetPartitions()
+ Collections.unmodifiableMap(memberAssignment.targetPartitions())
));
});
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 511db01..7d67f07 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -184,10 +184,7 @@
for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
String memberId = entry.getKey();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();
-
- Assignment assignment = new Assignment(topicPartitions);
-
- initialTargetAssignment.put(memberId, assignment);
+ initialTargetAssignment.put(memberId, new Assignment(topicPartitions));
}
return initialTargetAssignment;