KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)

This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor.

Trunk:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  24.535 ± 1.583  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  24.094 ± 0.223  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  14.697 ± 0.133  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  15.073 ± 0.135  ms/op
JMH benchmarks done
```

Patch:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.376 ± 0.577  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.731 ± 0.359  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  1.975 ± 0.086  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  2.026 ± 0.190  ms/op
JMH benchmarks done
```

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
index 3ea1361..34e8256 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with
  * all its members subscribed to the same set of topics.
- * It is optimized since the assignment can be done in fewer, less complicated steps compared to when
- * the subscriptions are different across the members.
  *
  * Assignments are done according to the following principles:
  *
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *      Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder {
-    private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {
+    private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass();
+    private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();
+
+    /**
+     * @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not
+     * public hence we cannot use the `instanceof` operator.
+     */
+    private static boolean isImmutableMap(Map<?, ?> map) {
+        return UNMODIFIABLE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map);
+    }
 
     /**
      * The assignment specification which includes member metadata.
@@ -72,62 +70,53 @@
     private final Set<Uuid> subscribedTopicIds;
 
     /**
-     * The number of members to receive an extra partition beyond the minimum quota.
-     * Minimum Quota = Total Partitions / Total Members
-     * Example: If there are 11 partitions to be distributed among 3 members,
-     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition.
+     * The members that are below their quota.
      */
-    private int remainingMembersToGetAnExtraPartition;
-
-    /**
-     * Members mapped to the remaining number of partitions needed to meet the minimum quota.
-     * Minimum quota = total partitions / total members.
-     */
-    private Map<String, Integer> potentiallyUnfilledMembers;
+    private final List<MemberWithRemainingQuota> unfilledMembers;
 
     /**
      * The partitions that still need to be assigned.
      * Initially this contains all the subscribed topics' partitions.
      */
-    private final Set<TopicIdPartition> unassignedPartitions;
+    private final List<TopicIdPartition> unassignedPartitions;
 
     /**
      * The target assignment.
      */
     private final Map<String, MemberAssignment> targetAssignment;
 
+    /**
+     * The minimum number of partitions that a member must have.
+     * Minimum quota = total partitions / total members.
+     */
+    private int minimumMemberQuota;
+
+    /**
+     * The number of members to receive an extra partition beyond the minimum quota.
+     * Example: If there are 11 partitions to be distributed among 3 members,
+     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition.
+     */
+    private int remainingMembersToGetAnExtraPartition;
+
     OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
         this.groupSpec = groupSpec;
         this.subscribedTopicDescriber = subscribedTopicDescriber;
         this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
-        this.potentiallyUnfilledMembers = new HashMap<>();
-        this.unassignedPartitions = new HashSet<>();
+        this.unfilledMembers = new ArrayList<>();
+        this.unassignedPartitions = new ArrayList<>();
         this.targetAssignment = new HashMap<>();
     }
 
     /**
-     * Here's the step-by-step breakdown of the assignment process:
-     *
-     * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
-     * <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
-     *      current target assignment.</li>
-     * <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
-     * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
-     * <li> Proceed with a round-robin assignment according to quotas.
-     *      For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
+     * Compute the new assignment for the group.
      */
-    @Override
-    protected GroupAssignment buildAssignment() throws PartitionAssignorException {
-        int totalPartitionsCount = 0;
-
+    public GroupAssignment build() throws PartitionAssignorException {
         if (subscribedTopicIds.isEmpty()) {
-            LOG.debug("The subscription list is empty, returning an empty assignment");
             return new GroupAssignment(Collections.emptyMap());
         }
 
-        // Check if the subscribed topicId is still valid.
-        // Update unassigned partitions based on the current target assignment
-        // and topic metadata.
+        // Compute the list of unassigned partitions.
+        int totalPartitionsCount = 0;
         for (Uuid topicId : subscribedTopicIds) {
             int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
             if (partitionCount == -1) {
@@ -144,216 +133,149 @@
             }
         }
 
-        // The minimum required quota that each member needs to meet for a balanced assignment.
-        // This is the same for all members.
-        final int numberOfMembers = groupSpec.members().size();
-        final int minQuota = totalPartitionsCount / numberOfMembers;
+        // Compute the minimum required quota per member and the number of members
+        // that should receive an extra partition.
+        int numberOfMembers = groupSpec.members().size();
+        minimumMemberQuota = totalPartitionsCount / numberOfMembers;
         remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
 
-        groupSpec.members().keySet().forEach(memberId ->
-            targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
-        ));
+        // Revoke the partitions that either are not part of the member's subscriptions or
+        // exceed the maximum quota assigned to each member.
+        maybeRevokePartitions();
 
-        potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-        unassignedPartitionsRoundRobinAssignment();
-
-        if (!unassignedPartitions.isEmpty()) {
-            throw new PartitionAssignorException("Partitions were left unassigned");
-        }
+        // Assign the unassigned partitions to the members with space.
+        assignRemainingPartitions();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * Retains a set of partitions from the existing assignment and includes them in the target assignment.
-     * Only relevant partitions that exist in the current topic metadata and subscriptions are considered.
+     * Revoke the partitions that either are not part of the member's subscriptions or
+     * exceed the maximum quota assigned to each member.
      *
-     * <p> For each member:
-     * <ol>
-     *     <li> Find the valid current assignment considering topic subscriptions and metadata</li>
-     *     <li> If the current assignment exists, retain partitions up to the minimum quota.</li>
-     *     <li> If the current assignment size is greater than the minimum quota and
-     *          there are members that could get an extra partition, assign the next partition as well.</li>
-     *     <li> Finally, if the member's current assignment size is less than the minimum quota,
-     *          add them to the potentially unfilled members map and track the number of remaining
-     *          partitions required to meet the quota.</li>
-     * </ol>
-     * </p>
-     *
-     * @return  Members mapped to the remaining number of partitions needed to meet the minimum quota,
-     *          including members that are eligible to receive an extra partition.
+     * This method ensures that the original assignment is not copied if it is not
+     * altered.
      */
-    private Map<String, Integer> assignStickyPartitions(int minQuota) {
-        Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
+    private void maybeRevokePartitions() {
+        for (Map.Entry<String, AssignmentMemberSpec> entry : groupSpec.members().entrySet()) {
+            String memberId = entry.getKey();
+            AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+            Map<Uuid, Set<Integer>> oldAssignment = assignmentMemberSpec.assignedPartitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
 
-        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-            List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
-                assignmentMemberSpec.assignedPartitions()
-            );
+            // The assignor expects to receive the assignment as an immutable map. It leverages
+            // this knowledge in order to avoid having to copy all assignments.
+            if (!isImmutableMap(oldAssignment)) {
+                throw new IllegalStateException("The assignor expect an immutable map.");
+            }
 
-            int currentAssignmentSize = validCurrentMemberAssignment.size();
-            // Number of partitions required to meet the minimum quota.
-            int remaining = minQuota - currentAssignmentSize;
+            int quota = minimumMemberQuota;
+            if (remainingMembersToGetAnExtraPartition > 0) {
+                quota++;
+                remainingMembersToGetAnExtraPartition--;
+            }
 
-            if (currentAssignmentSize > 0) {
-                int retainedPartitionsCount = min(currentAssignmentSize, minQuota);
-                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-                    TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i);
-                    addPartitionToAssignment(
-                        targetAssignment,
-                        memberId,
-                        topicIdPartition.topicId(),
-                        topicIdPartition.partitionId()
-                    );
-                });
+            for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) {
+                Uuid topicId = topicPartitions.getKey();
+                Set<Integer> partitions = topicPartitions.getValue();
 
-                if (remaining < 0) {
-                    // The extra partition is located at the last index from the previous step.
-                    if (remainingMembersToGetAnExtraPartition > 0) {
-                        TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
-                        addPartitionToAssignment(
-                            targetAssignment,
-                            memberId,
-                            topicIdPartition.topicId(),
-                            topicIdPartition.partitionId()
-                        );
-                        remainingMembersToGetAnExtraPartition--;
+                if (subscribedTopicIds.contains(topicId)) {
+                    if (partitions.size() <= quota) {
+                        quota -= partitions.size();
+                    } else {
+                        for (Integer partition : partitions) {
+                            if (quota > 0) {
+                                quota--;
+                            } else {
+                                if (newAssignment == null) {
+                                    // If the new assignment is null, we create a deep copy of the
+                                    // original assignment so that we can alter it.
+                                    newAssignment = deepCopy(oldAssignment);
+                                }
+                                // Remove the partition from the new assignment.
+                                Set<Integer> parts = newAssignment.get(topicId);
+                                parts.remove(partition);
+                                if (parts.isEmpty()) {
+                                    newAssignment.remove(topicId);
+                                }
+                                // Add the partition to the unassigned set to be re-assigned later on.
+                                unassignedPartitions.add(new TopicIdPartition(topicId, partition));
+                            }
+                        }
                     }
-                    // Any previously owned partitions that weren't retained due to the quotas
-                    // are added to the unassigned partitions set.
-                    if (retainedPartitionsCount < currentAssignmentSize) {
-                        unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
-                            retainedPartitionsCount,
-                            currentAssignmentSize
-                        ));
+                } else {
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
                     }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
             }
 
-            if (remaining >= 0) {
-                potentiallyUnfilledMembers.put(memberId, remaining);
+            if (quota > 0) {
+                unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota));
             }
-        });
 
-        return potentiallyUnfilledMembers;
-    }
-
-    /**
-     * Filters the current assignment of partitions for a given member based on certain criteria.
-     *
-     * Any partition that still belongs to the member's subscribed topics list is considered valid.
-     *
-     * @param currentMemberAssignment       The map of topics to partitions currently assigned to the member.
-     *
-     * @return List of valid partitions after applying the filters.
-     */
-    private List<TopicIdPartition> validCurrentMemberAssignment(
-        Map<Uuid, Set<Integer>> currentMemberAssignment
-    ) {
-        List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
-        currentMemberAssignment.forEach((topicId, partitions) -> {
-            if (subscribedTopicIds.contains(topicId)) {
-                partitions.forEach(partition -> {
-                    TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
-                    validCurrentAssignmentList.add(topicIdPartition);
-                });
+            if (newAssignment == null) {
+                targetAssignment.put(memberId, new MemberAssignment(oldAssignment));
             } else {
-                LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list");
-            }
-        });
-
-        return validCurrentAssignmentList;
-    }
-
-    /**
-     * Allocates the unassigned partitions to unfilled members in a round-robin fashion.
-     */
-    private void unassignedPartitionsRoundRobinAssignment() {
-        Queue<String> roundRobinMembers = new LinkedList<>(potentiallyUnfilledMembers.keySet());
-
-        // Partitions are sorted to ensure an even topic wise distribution across members.
-        // This not only balances the load but also makes partition-to-member mapping more predictable.
-        List<TopicIdPartition> sortedPartitionsList = unassignedPartitions.stream()
-            .sorted(Comparator.comparing(TopicIdPartition::topicId).thenComparing(TopicIdPartition::partitionId))
-            .collect(Collectors.toList());
-
-        for (TopicIdPartition topicIdPartition : sortedPartitionsList) {
-            boolean assigned = false;
-
-            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
-                String memberId = roundRobinMembers.poll();
-                if (potentiallyUnfilledMembers.containsKey(memberId)) {
-                    assigned = maybeAssignPartitionToMember(memberId, topicIdPartition);
-                }
-                // Only re-add the member to the end of the queue if it's still available for assignment.
-                if (potentiallyUnfilledMembers.containsKey(memberId)) {
-                    roundRobinMembers.add(memberId);
-                }
-            }
-
-            if (assigned) {
-                unassignedPartitions.remove(topicIdPartition);
+                targetAssignment.put(memberId, new MemberAssignment(newAssignment));
             }
         }
     }
 
     /**
-     * Assigns the specified partition to the given member and updates the potentially unfilled members map.
-     * Only assign extra partitions once the member has met its minimum quota = total partitions / total members.
-     *
-     * <ol>
-     *     <li> If the minimum quota hasn't been met aka remaining > 0 directly assign the partition.
-     *          After assigning the partition, if the min quota has been met aka remaining = 0, remove the member
-     *          if there's no members left to receive an extra partition. Otherwise, keep it in the
-     *          potentially unfilled map. </li>
-     *     <li> If the minimum quota has been met and if there is potential to receive an extra partition, assign it.
-     *          Remove the member from the potentially unfilled map since it has already received the extra partition
-     *          and met the min quota. </li>
-     *     <li> Else, don't assign the partition. </li>
-     * </ol>
-     *
-     * @param memberId              The Id of the member to which the partition will be assigned.
-     * @param topicIdPartition      The topicIdPartition to be assigned.
-     * @return true if the assignment was successful, false otherwise.
+     * Assign the unassigned partitions to the unfilled members.
      */
-    private boolean maybeAssignPartitionToMember(String memberId, TopicIdPartition topicIdPartition) {
-        int remaining = potentiallyUnfilledMembers.get(memberId);
-        boolean shouldAssign = false;
+    private void assignRemainingPartitions() {
+        int unassignedPartitionIndex = 0;
 
-        // If the member hasn't met the minimum quota, set the flag for assignment.
-        // If member has met minimum quota and there's an extra partition available, set the flag for assignment.
-        if (remaining > 0) {
-            potentiallyUnfilledMembers.put(memberId, --remaining);
-            shouldAssign = true;
+        for (MemberWithRemainingQuota unfilledMember : unfilledMembers) {
+            String memberId = unfilledMember.memberId;
+            int remainingQuota = unfilledMember.remainingQuota;
 
-            // If the member meets the minimum quota due to this assignment,
-            // check if any extra partitions are available.
-            // Removing the member from the list reduces an iteration for when remaining = 0 but there's no extras left.
-            if (remaining == 0 && remainingMembersToGetAnExtraPartition == 0) {
-                potentiallyUnfilledMembers.remove(memberId);
+            Map<Uuid, Set<Integer>> newAssignment = targetAssignment.get(memberId).targetPartitions();
+            if (isImmutableMap(newAssignment)) {
+                // If the new assignment is immutable, we must create a deep copy of it
+                // before altering it.
+                newAssignment = deepCopy(newAssignment);
+                targetAssignment.put(memberId, new MemberAssignment(newAssignment));
             }
-        } else if (remaining == 0 && remainingMembersToGetAnExtraPartition > 0) {
-            remainingMembersToGetAnExtraPartition--;
-            // Each member can only receive one extra partition, once they meet the minimum quota and receive an extra
-            // partition they can be removed from the potentially unfilled members map.
-            potentiallyUnfilledMembers.remove(memberId);
-            shouldAssign = true;
+
+            for (int i = 0; i < remainingQuota && unassignedPartitionIndex < unassignedPartitions.size(); i++) {
+                TopicIdPartition unassignedTopicIdPartition = unassignedPartitions.get(unassignedPartitionIndex);
+                unassignedPartitionIndex++;
+                newAssignment
+                    .computeIfAbsent(unassignedTopicIdPartition.topicId(), __ -> new HashSet<>())
+                    .add(unassignedTopicIdPartition.partitionId());
+            }
         }
 
-        // Assign the partition if flag is set.
-        if (shouldAssign) {
-            addPartitionToAssignment(
-                targetAssignment,
-                memberId,
-                topicIdPartition.topicId(),
-                topicIdPartition.partitionId()
-            );
-            return true;
+        if (unassignedPartitionIndex < unassignedPartitions.size()) {
+            throw new PartitionAssignorException("Partitions were left unassigned");
         }
+    }
 
-        // No assignment possible because the member met the minimum quota but
-        // number of members to receive an extra partition is zero.
-        return false;
+    private static Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> map) {
+        Map<Uuid, Set<Integer>> copy = new HashMap<>(map.size());
+        for (Map.Entry<Uuid, Set<Integer>> entry : map.entrySet()) {
+            copy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+        }
+        return copy;
+    }
+
+    private static class MemberWithRemainingQuota {
+        final String memberId;
+        final int remainingQuota;
+
+        MemberWithRemainingQuota(
+            String memberId,
+            int remainingQuota
+        ) {
+            this.memberId = memberId;
+            this.remainingQuota = remainingQuota;
+        }
     }
 }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
index 7da7c2d..648b016 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
@@ -66,21 +66,19 @@
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
-        AbstractUniformAssignmentBuilder assignmentBuilder;
-
         if (groupSpec.members().isEmpty())
             return new GroupAssignment(Collections.emptyMap());
 
         if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
             LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
                 + "optimized assignment algorithm");
-            assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+            return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
+                .build();
         } else {
             LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
                 + "general assignment algorithm");
-            assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+            return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
+                .buildAssignment();
         }
-
-        return assignmentBuilder.buildAssignment();
     }
 }
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 74bb303..ffc5455 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -22,12 +22,13 @@
 
 import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -42,13 +43,13 @@
         );
     }
 
-    public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment(
+    public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment(
         Uuid topicId,
         Integer... partitions
     ) {
         return new AbstractMap.SimpleEntry<>(
             topicId,
-            new TreeSet<>(Arrays.asList(partitions))
+            new LinkedHashSet<>(Arrays.asList(partitions))
         );
     }
 
@@ -56,18 +57,18 @@
     public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
         Map<Uuid, Set<Integer>> assignment = new HashMap<>();
         for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
-            assignment.put(entry.getKey(), entry.getValue());
+            assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
         }
-        return assignment;
+        return Collections.unmodifiableMap(assignment);
     }
 
     @SafeVarargs
-    public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
+    public static Map<Uuid, Set<Integer>> mkOrderedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
         Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>();
         for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
-            assignment.put(entry.getKey(), entry.getValue());
+            assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
         }
-        return assignment;
+        return Collections.unmodifiableMap(assignment);
     }
 
     /**
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
index 77bad7a..900ba31 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
@@ -69,8 +69,8 @@
 import java.util.stream.Stream;
 
 import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
-import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
-import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentRecord;
 import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord;
@@ -297,7 +297,7 @@
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
 
-        Map<Uuid, Set<Integer>> partitions = mkSortedAssignment(
+        Map<Uuid, Set<Integer>> partitions = mkOrderedAssignment(
             mkTopicAssignment(topicId1, 11, 12, 13),
             mkTopicAssignment(topicId2, 21, 22, 23)
         );
@@ -379,14 +379,14 @@
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
 
-        Map<Uuid, Set<Integer>> assigned = mkSortedAssignment(
-            mkSortedTopicAssignment(topicId1, 11, 12, 13),
-            mkSortedTopicAssignment(topicId2, 21, 22, 23)
+        Map<Uuid, Set<Integer>> assigned = mkOrderedAssignment(
+            mkOrderedTopicAssignment(topicId1, 11, 12, 13),
+            mkOrderedTopicAssignment(topicId2, 21, 22, 23)
         );
 
-        Map<Uuid, Set<Integer>> revoking = mkSortedAssignment(
-            mkSortedTopicAssignment(topicId1, 14, 15, 16),
-            mkSortedTopicAssignment(topicId2, 24, 25, 26)
+        Map<Uuid, Set<Integer>> revoking = mkOrderedAssignment(
+            mkOrderedTopicAssignment(topicId1, 14, 15, 16),
+            mkOrderedTopicAssignment(topicId2, 24, 25, 26)
         );
 
         CoordinatorRecord expectedRecord = new CoordinatorRecord(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index f21bd63..fdc4f59 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -34,6 +34,7 @@
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
@@ -158,12 +159,11 @@
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic3Uuid, 1)
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic3Uuid, 0, 1)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic3Uuid, 0)
+            mkTopicAssignment(topic1Uuid, 1, 2)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -295,30 +295,25 @@
         ));
 
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 1),
-                mkTopicAssignment(topic2Uuid, 0, 1)
-            )
-        );
+
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 1),
+                mkTopicAssignment(topic2Uuid, 0, 1)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 2),
-                mkTopicAssignment(topic2Uuid, 2)
-            )
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 2),
+                mkTopicAssignment(topic2Uuid, 2)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@@ -366,40 +361,34 @@
 
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 2),
-                mkTopicAssignment(topic2Uuid, 0)
-            )
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 1),
-                mkTopicAssignment(topic2Uuid, 1, 2)
-            )
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
-            mkTopicAssignment(topic2Uuid, 0, 4)
+            mkTopicAssignment(topic1Uuid, 0, 2, 3),
+            mkTopicAssignment(topic2Uuid, 0, 3, 4)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1, 4),
-            mkTopicAssignment(topic2Uuid, 1, 2, 3)
+            mkTopicAssignment(topic1Uuid, 1, 4, 5),
+            mkTopicAssignment(topic2Uuid, 1, 2)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -436,26 +425,24 @@
 
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic2Uuid, 0)
-        ));
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1, 2)
-        ));
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
         ));
 
         // Add a new member to trigger a re-assignment.
@@ -512,38 +499,36 @@
 
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0),
-            mkTopicAssignment(topic2Uuid, 0)
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1)
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
         ));
 
         // Member C was removed
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic2Uuid, 0)
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0, 2)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1, 2)
+            mkTopicAssignment(topic1Uuid, 1, 2),
+            mkTopicAssignment(topic2Uuid, 1)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -581,26 +566,24 @@
         // Initial subscriptions were [T1, T2]
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0),
-            mkTopicAssignment(topic2Uuid, 0)
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             Collections.singleton(topic2Uuid),
-            currentAssignmentForA
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1)
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             Collections.singleton(topic2Uuid),
-            currentAssignmentForB
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 2349350..77a38ab 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -268,7 +268,7 @@
                 assignmentMemberSpec.instanceId(),
                 assignmentMemberSpec.rackId(),
                 assignmentMemberSpec.subscribedTopicIds(),
-                memberAssignment.targetPartitions()
+                Collections.unmodifiableMap(memberAssignment.targetPartitions())
             ));
         });
 
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 511db01..7d67f07 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -184,10 +184,7 @@
         for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
             String memberId = entry.getKey();
             Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();
-
-            Assignment assignment = new Assignment(topicPartitions);
-
-            initialTargetAssignment.put(memberId, assignment);
+            initialTargetAssignment.put(memberId, new Assignment(topicPartitions));
         }
 
         return initialTargetAssignment;