KAFKA-16625; Reverse lookup map from topic partitions to members (#15974)

This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5a41140..76a94fd 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1901,6 +1901,7 @@
                     .withSubscriptionMetadata(subscriptionMetadata)
                     .withSubscriptionType(subscriptionType)
                     .withTargetAssignment(group.targetAssignment())
+                    .withInvertedTargetAssignment(group.invertedTargetAssignment())
                     .withTopicsImage(metadataImage.topics())
                     .addOrUpdateMember(updatedMember.memberId(), updatedMember);
             TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java
deleted file mode 100644
index ec65dc6..0000000
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group.assignor;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * The assignment specification for a consumer group.
- */
-public class AssignmentSpec {
-    /**
-     * The member metadata keyed by member Id.
-     */
-    private final Map<String, AssignmentMemberSpec> members;
-
-    /**
-     * The subscription type followed by the group.
-     */
-    private final SubscriptionType subscriptionType;
-
-    public AssignmentSpec(
-        Map<String, AssignmentMemberSpec> members,
-        SubscriptionType subscriptionType
-    ) {
-        Objects.requireNonNull(members);
-        this.members = members;
-        this.subscriptionType = subscriptionType;
-    }
-
-    /**
-     * @return Member metadata keyed by member Id.
-     */
-    public Map<String, AssignmentMemberSpec> members() {
-        return members;
-    }
-
-    /**
-     * @return The group's subscription type.
-     */
-    public SubscriptionType subscriptionType() {
-        return subscriptionType;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        AssignmentSpec that = (AssignmentSpec) o;
-        return subscriptionType == that.subscriptionType &&
-            members.equals(that.members);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(members, subscriptionType);
-    }
-
-    public String toString() {
-        return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')';
-    }
-}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
index f8d165e..d6f83df 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
@@ -107,8 +107,8 @@
      */
     private final PartitionMovements partitionMovements;
 
-    public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
-        this.members = assignmentSpec.members();
+    public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.members = groupSpec.members();
         this.subscribedTopicDescriber = subscribedTopicDescriber;
         this.subscribedTopicIds = new HashSet<>();
         this.membersPerTopic = new HashMap<>();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java
new file mode 100644
index 0000000..296dedb
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The group metadata specifications required to compute the target assignment.
+ */
+public interface GroupSpec {
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return The group's subscription type.
+     */
+    SubscriptionType subscriptionType();
+
+    /**
+     * @return True, if the partition is currently assigned to a member.
+     *         False, otherwise.
+     */
+    boolean isPartitionAssigned(Uuid topicId, int partitionId);
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java
new file mode 100644
index 0000000..0194727
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a consumer group.
+ */
+public class GroupSpecImpl implements GroupSpec {
+    /**
+     * The member metadata keyed by member Id.
+     */
+    private final Map<String, AssignmentMemberSpec> members;
+
+    /**
+     * The subscription type followed by the group.
+     */
+    private final SubscriptionType subscriptionType;
+
+    /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+
+    public GroupSpecImpl(
+        Map<String, AssignmentMemberSpec> members,
+        SubscriptionType subscriptionType,
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+    ) {
+        Objects.requireNonNull(members);
+        Objects.requireNonNull(subscriptionType);
+        Objects.requireNonNull(invertedTargetAssignment);
+        this.members = members;
+        this.subscriptionType = subscriptionType;
+        this.invertedTargetAssignment = invertedTargetAssignment;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Map<String, AssignmentMemberSpec> members() {
+        return members;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SubscriptionType subscriptionType() {
+        return subscriptionType;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
+        Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
+        if (partitionMap == null) {
+            return false;
+        }
+        return partitionMap.containsKey(partitionId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        GroupSpecImpl that = (GroupSpecImpl) o;
+        return subscriptionType == that.subscriptionType &&
+            members.equals(that.members) &&
+            invertedTargetAssignment.equals(that.invertedTargetAssignment);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = members.hashCode();
+        result = 31 * result + subscriptionType.hashCode();
+        result = 31 * result + invertedTargetAssignment.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "GroupSpecImpl(members=" + members +
+            ", subscriptionType=" + subscriptionType +
+            ", invertedTargetAssignment=" + invertedTargetAssignment +
+            ')';
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
index a143637..3ea1361 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
@@ -59,7 +59,7 @@
     /**
      * The assignment specification which includes member metadata.
      */
-    private final AssignmentSpec assignmentSpec;
+    private final GroupSpec groupSpec;
 
     /**
      * The topic and partition metadata describer.
@@ -89,18 +89,19 @@
      * The partitions that still need to be assigned.
      * Initially this contains all the subscribed topics' partitions.
      */
-    private Set<TopicIdPartition> unassignedPartitions;
+    private final Set<TopicIdPartition> unassignedPartitions;
 
     /**
      * The target assignment.
      */
     private final Map<String, MemberAssignment> targetAssignment;
 
-    OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
-        this.assignmentSpec = assignmentSpec;
+    OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.groupSpec = groupSpec;
         this.subscribedTopicDescriber = subscribedTopicDescriber;
-        this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+        this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
         this.potentiallyUnfilledMembers = new HashMap<>();
+        this.unassignedPartitions = new HashSet<>();
         this.targetAssignment = new HashMap<>();
     }
 
@@ -108,9 +109,9 @@
      * Here's the step-by-step breakdown of the assignment process:
      *
      * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
-     * <li> Initialize unassigned partitions to all the topic partitions and
-     *      remove partitions from the list as and when they are assigned.</li>
-     * <li> For existing assignments, retain partitions based on the determined quota.</li>
+     * <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
+     *      current target assignment.</li>
+     * <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
      * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
      * <li> Proceed with a round-robin assignment according to quotas.
      *      For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
@@ -124,6 +125,9 @@
             return new GroupAssignment(Collections.emptyMap());
         }
 
+        // Check if the subscribed topicId is still valid.
+        // Update unassigned partitions based on the current target assignment
+        // and topic metadata.
         for (Uuid topicId : subscribedTopicIds) {
             int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
             if (partitionCount == -1) {
@@ -131,21 +135,25 @@
                     "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
                 );
             } else {
+                for (int i = 0; i < partitionCount; i++) {
+                    if (!groupSpec.isPartitionAssigned(topicId, i)) {
+                        unassignedPartitions.add(new TopicIdPartition(topicId, i));
+                    }
+                }
                 totalPartitionsCount += partitionCount;
             }
         }
 
         // The minimum required quota that each member needs to meet for a balanced assignment.
         // This is the same for all members.
-        final int numberOfMembers = assignmentSpec.members().size();
+        final int numberOfMembers = groupSpec.members().size();
         final int minQuota = totalPartitionsCount / numberOfMembers;
         remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
 
-        assignmentSpec.members().keySet().forEach(memberId ->
+        groupSpec.members().keySet().forEach(memberId ->
             targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
         ));
 
-        unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
         potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
 
         unassignedPartitionsRoundRobinAssignment();
@@ -179,7 +187,7 @@
     private Map<String, Integer> assignStickyPartitions(int minQuota) {
         Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
 
-        assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
             List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
                 assignmentMemberSpec.assignedPartitions()
             );
@@ -198,20 +206,28 @@
                         topicIdPartition.topicId(),
                         topicIdPartition.partitionId()
                     );
-                    unassignedPartitions.remove(topicIdPartition);
                 });
 
-                // The extra partition is located at the last index from the previous step.
-                if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) {
-                    TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount);
-                    addPartitionToAssignment(
-                        targetAssignment,
-                        memberId,
-                        topicIdPartition.topicId(),
-                        topicIdPartition.partitionId()
-                    );
-                    unassignedPartitions.remove(topicIdPartition);
-                    remainingMembersToGetAnExtraPartition--;
+                if (remaining < 0) {
+                    // The extra partition is located at the last index from the previous step.
+                    if (remainingMembersToGetAnExtraPartition > 0) {
+                        TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
+                        addPartitionToAssignment(
+                            targetAssignment,
+                            memberId,
+                            topicIdPartition.topicId(),
+                            topicIdPartition.partitionId()
+                        );
+                        remainingMembersToGetAnExtraPartition--;
+                    }
+                    // Any previously owned partitions that weren't retained due to the quotas
+                    // are added to the unassigned partitions set.
+                    if (retainedPartitionsCount < currentAssignmentSize) {
+                        unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
+                            retainedPartitionsCount,
+                            currentAssignmentSize
+                        ));
+                    }
                 }
             }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
index 27b38cc..13b0ee3 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
@@ -34,12 +34,12 @@
     /**
      * Assigns partitions to group members based on the given assignment specification and topic metadata.
      *
-     * @param assignmentSpec           The assignment spec which includes member metadata.
+     * @param groupSpec           The assignment spec which includes member metadata.
      * @param subscribedTopicDescriber The topic and partition metadata describer.
      * @return The new assignment for the group.
      */
     GroupAssignment assign(
-        AssignmentSpec assignmentSpec,
+        GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException;
 }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
index 00fcd7d..482ad02 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.common.errors.ApiException;
 
 /**
- * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception
+ * Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception
  * is only used internally.
  */
 public class PartitionAssignorException extends ApiException {
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
index a1631f5..8393353 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
@@ -81,20 +81,20 @@
      * Returns a map of topic Ids to a list of members subscribed to them,
      * based on the given assignment specification and metadata.
      *
-     * @param assignmentSpec                The specification for member assignments.
+     * @param groupSpec                     The specification for member assignments.
      * @param subscribedTopicDescriber      The metadata describer for subscribed topics and clusters.
      * @return A map of topic Ids to a list of member Ids subscribed to them.
      *
      * @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
      */
     private Map<Uuid, Collection<String>> membersPerTopic(
-        final AssignmentSpec assignmentSpec,
+        final GroupSpec groupSpec,
         final SubscribedTopicDescriber subscribedTopicDescriber
     ) {
         Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-        Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
+        Map<String, AssignmentMemberSpec> membersData = groupSpec.members();
 
-        if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
+        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
             Set<String> allMembers = membersData.keySet();
             Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();
 
@@ -139,7 +139,7 @@
      */
     @Override
     public GroupAssignment assign(
-        final AssignmentSpec assignmentSpec,
+        final GroupSpec groupSpec,
         final SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
 
@@ -147,7 +147,7 @@
 
         // Step 1
         Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
-            assignmentSpec,
+            groupSpec,
             subscribedTopicDescriber
         );
 
@@ -162,7 +162,7 @@
             List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();
 
             for (String memberId : membersForTopic) {
-                Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId)
+                Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
                     .assignedPartitions().getOrDefault(topicId, Collections.emptySet());
 
                 int currentAssignmentSize = assignedPartitionsForTopic.size();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
index aff9365..caa0de4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
@@ -57,28 +57,28 @@
      * Perform the group assignment given the current members and
      * topics metadata.
      *
-     * @param assignmentSpec                The assignment specification that included member metadata.
+     * @param groupSpec                     The assignment specification that included member metadata.
      * @param subscribedTopicDescriber      The topic and cluster metadata describer {@link SubscribedTopicDescriber}.
      * @return The new target assignment for the group.
      */
     @Override
     public GroupAssignment assign(
-        AssignmentSpec assignmentSpec,
+        GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
         AbstractUniformAssignmentBuilder assignmentBuilder;
 
-        if (assignmentSpec.members().isEmpty())
+        if (groupSpec.members().isEmpty())
             return new GroupAssignment(Collections.emptyMap());
 
-        if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
+        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
             LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
                 + "optimized assignment algorithm");
-            assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+            assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
         } else {
             LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
                 + "general assignment algorithm");
-            assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+            assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
         }
 
         return assignmentBuilder.buildAssignment();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index dd3a6f2..007971a 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -171,6 +171,12 @@
     private final TimelineHashMap<String, Assignment> targetAssignment;
 
     /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> invertedTargetAssignment;
+
+    /**
      * The current partition epoch maps each topic-partitions to their current epoch where
      * the epoch is the epoch of their owners. When a member revokes a partition, it removes
      * its epochs from this map. When a member gets a partition, it adds its epochs to this map.
@@ -221,6 +227,7 @@
         this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS);
         this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metrics = Objects.requireNonNull(metrics);
         this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
@@ -517,21 +524,89 @@
     }
 
     /**
-     * Updates target assignment of a member.
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
+        return Collections.unmodifiableMap(invertedTargetAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) {
+        updateInvertedTargetAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
     /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = invertedTargetAssignment.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) {
+                    topicPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    topicPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (topicPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(topicId);
+            } else {
+                invertedTargetAssignment.put(topicId, topicPartitionAssignment);
+            }
+        }
+    }
+
+    /**
      * Removes the target assignment of a member.
      *
      * @param memberId The member id.
      */
     public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+            Assignment.EMPTY
+        );
         targetAssignment.remove(memberId);
     }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index 09a44b1..57d6039 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.group.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
 import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -127,6 +127,12 @@
     private Map<String, Assignment> targetAssignment = Collections.emptyMap();
 
     /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private Map<Uuid, Map<Integer, String>> invertedTargetAssignment = Collections.emptyMap();
+
+    /**
      * The topics image.
      */
     private TopicsImage topicsImage = TopicsImage.EMPTY;
@@ -225,6 +231,19 @@
     }
 
     /**
+     * Adds the existing topic partition assignments.
+     *
+     * @param invertedTargetAssignment   The reverse lookup map of the current target assignment.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withInvertedTargetAssignment(
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+    ) {
+        this.invertedTargetAssignment = invertedTargetAssignment;
+        return this;
+    }
+
+    /**
      * Adds the topics image.
      *
      * @param topicsImage    The topics image.
@@ -317,7 +336,11 @@
 
         // Compute the assignment.
         GroupAssignment newGroupAssignment = assignor.assign(
-            new AssignmentSpec(Collections.unmodifiableMap(memberSpecs), subscriptionType),
+            new GroupSpecImpl(
+                Collections.unmodifiableMap(memberSpecs),
+                subscriptionType,
+                invertedTargetAssignment
+            ),
             new SubscribedTopicMetadata(topicMetadataMap)
         );
 
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 8b6a09b..74bb303 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 
 import java.util.AbstractMap;
@@ -82,4 +83,33 @@
             assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
         });
     }
+
+    /**
+     * Generate a reverse look up map of partition to member target assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the value is an
+     *                          AssignmentMemberSpec object containing the member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+        Map<String, AssignmentMemberSpec> memberSpec
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
+
+                for (Integer partitionId : partitions) {
+                    partitionMap.put(partitionId, memberId);
+                }
+            }
+        }
+        return invertedTargetAssignment;
+    }
 }
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
index 736ee43..1fdbb31 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpec;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
+    public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
         return prepareGroupAssignment;
     }
 
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
index 05b8cdc..cf15ee5 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpec;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -34,8 +34,8 @@
     }
 
     @Override
-    public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
-        return new GroupAssignment(assignmentSpec.members().entrySet()
+    public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+        return new GroupAssignment(groupSpec.members().entrySet()
             .stream()
             .collect(Collectors.toMap(
                 Map.Entry::getKey,
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
index 32f9618..f5a7be2 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
@@ -32,6 +32,7 @@
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -79,8 +80,16 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
-        GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
+
+        GroupAssignment groupAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         assertEquals(Collections.emptyMap(), groupAssignment.members());
     }
@@ -113,10 +122,14 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
 
         assertThrows(PartitionAssignorException.class,
-            () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+            () -> assignor.assign(groupSpec, subscribedTopicMetadata));
     }
 
     @Test
@@ -149,10 +162,17 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -202,10 +222,17 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@@ -285,10 +312,17 @@
             currentAssignmentForC
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -362,10 +396,17 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -429,10 +470,17 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -496,10 +544,17 @@
 
         // Member C was removed
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -554,10 +609,17 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java
new file mode 100644
index 0000000..4060b1a
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class GroupSpecImplTest {
+
+    private Map<String, AssignmentMemberSpec> members;
+    private SubscriptionType subscriptionType;
+    private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+    private GroupSpecImpl groupSpec;
+    private Uuid topicId;
+
+    @BeforeEach
+    void setUp() {
+        members = new HashMap<>();
+
+        subscriptionType = SubscriptionType.HOMOGENEOUS;
+        invertedTargetAssignment = new HashMap<>();
+        topicId = Uuid.randomUuid();
+
+        members.put("test-member",  new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            new HashSet<>(Collections.singletonList(topicId)),
+            Collections.emptyMap())
+        );
+
+        groupSpec = new GroupSpecImpl(
+            members,
+            subscriptionType,
+            invertedTargetAssignment
+        );
+    }
+
+    @Test
+    void testMembers() {
+        assertEquals(members, groupSpec.members());
+    }
+
+    @Test
+    void testSubscriptionType() {
+        assertEquals(subscriptionType, groupSpec.subscriptionType());
+    }
+
+    @Test
+    void testIsPartitionAssigned() {
+        Map<Integer, String> partitionMap = new HashMap<>();
+        partitionMap.put(1, "test-member");
+        invertedTargetAssignment.put(topicId, partitionMap);
+
+        assertTrue(groupSpec.isPartitionAssigned(topicId, 1));
+        assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
+        assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
+    }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index 4fe748c..f21bd63 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -35,6 +35,7 @@
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,8 +78,16 @@
             )
         );
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+
+        GroupAssignment groupAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         assertEquals(Collections.emptyMap(), groupAssignment.members());
     }
@@ -107,10 +116,14 @@
             )
         );
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
 
         assertThrows(PartitionAssignorException.class,
-            () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+            () -> assignor.assign(groupSpec, subscribedTopicMetadata));
     }
 
     @Test
@@ -143,11 +156,6 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 2),
@@ -158,6 +166,18 @@
             mkTopicAssignment(topic3Uuid, 0)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -192,11 +212,6 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
@@ -209,6 +224,18 @@
             Collections.emptyMap()
         );
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -236,10 +263,17 @@
             ));
         }
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
         SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
 
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -261,7 +295,6 @@
         ));
 
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-
         Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
             mkAssignment(
                 mkTopicAssignment(topic1Uuid, 0, 1),
@@ -288,11 +321,6 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
@@ -303,6 +331,18 @@
             mkTopicAssignment(topic2Uuid, 1, 2)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -352,11 +392,6 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
@@ -367,6 +402,18 @@
             mkTopicAssignment(topic2Uuid, 1, 2, 3)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -419,11 +466,6 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 2)
@@ -436,6 +478,18 @@
             mkTopicAssignment(topic2Uuid, 0, 2)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -482,11 +536,6 @@
 
         // Member C was removed
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 2),
@@ -497,6 +546,18 @@
             mkTopicAssignment(topic2Uuid, 1, 2)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
@@ -542,11 +603,6 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
             mkTopicAssignment(topic2Uuid, 0)
@@ -555,6 +611,18 @@
             mkTopicAssignment(topic2Uuid, 1)
         ));
 
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+
         assertAssignment(expectedAssignment, computedAssignment);
         checkValidityAndBalance(members, computedAssignment);
     }
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
index b52681d..7acb9bd 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
@@ -32,6 +32,7 @@
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
 import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,8 +75,16 @@
             )
         );
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+
+        GroupAssignment groupAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         assertEquals(Collections.emptyMap(), groupAssignment.members());
     }
@@ -104,10 +113,14 @@
             )
         );
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
 
         assertThrows(PartitionAssignorException.class,
-            () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+            () -> assignor.assign(groupSpec, subscribedTopicMetadata));
     }
 
     @Test
@@ -126,8 +139,6 @@
             createPartitionRacks(2)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         members.put(consumerA, new AssignmentMemberSpec(
@@ -144,16 +155,23 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic3Uuid, 0)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic3Uuid, 1)
@@ -184,8 +202,6 @@
             createPartitionRacks(2)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         members.put(consumerA, new AssignmentMemberSpec(
@@ -209,20 +225,26 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1, 2),
             mkTopicAssignment(topic2Uuid, 0, 1)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic3Uuid, 0)
         ));
-
         expectedAssignment.put(consumerC, mkAssignment(
             mkTopicAssignment(topic2Uuid, 2),
             mkTopicAssignment(topic3Uuid, 1)
@@ -247,8 +269,6 @@
             createPartitionRacks(2)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         members.put(consumerA, new AssignmentMemberSpec(
@@ -272,8 +292,17 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            Collections.emptyMap()
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
         // Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition.
@@ -281,12 +310,10 @@
             mkTopicAssignment(topic1Uuid, 0),
             mkTopicAssignment(topic3Uuid, 0)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic3Uuid, 1)
         ));
-
         expectedAssignment.put(consumerC, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2)
         ));
@@ -310,15 +337,12 @@
             createPartitionRacks(2)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0),
             mkTopicAssignment(topic2Uuid, 0)
         );
-
         members.put(consumerA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -330,7 +354,6 @@
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic2Uuid, 1)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -346,16 +369,23 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0),
             mkTopicAssignment(topic2Uuid, 0)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic2Uuid, 1)
@@ -383,15 +413,12 @@
             createPartitionRacks(4)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic2Uuid, 0, 1)
         );
-
         members.put(consumerA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -403,7 +430,6 @@
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -411,16 +437,23 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic2Uuid, 0, 1)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2, 3),
             mkTopicAssignment(topic2Uuid, 2, 3)
@@ -445,15 +478,12 @@
             createPartitionRacks(3)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic2Uuid, 0, 1)
         );
-
         members.put(consumerA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -465,7 +495,6 @@
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -481,21 +510,27 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0),
             mkTopicAssignment(topic2Uuid, 0)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         ));
-
         expectedAssignment.put(consumerC, mkAssignment(
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic2Uuid, 1)
@@ -521,15 +556,12 @@
             createPartitionRacks(3)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic2Uuid, 0, 1)
         );
-
         members.put(consumerA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -541,7 +573,6 @@
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -557,21 +588,27 @@
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1),
             mkTopicAssignment(topic2Uuid, 0, 1)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         ));
-
         expectedAssignment.put(consumerC, mkAssignment(
             mkTopicAssignment(topic1Uuid, 3)
         ));
@@ -595,8 +632,6 @@
             createPartitionRacks(3)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
         // Consumer A was removed
 
@@ -604,7 +639,6 @@
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 2)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -612,11 +646,19 @@
             currentAssignmentForB
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HOMOGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1, 2),
             mkTopicAssignment(topic2Uuid, 0, 1, 2)
@@ -647,18 +689,14 @@
             createPartitionRacks(2)
         ));
 
-        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
-        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-
         // Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
         // Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
         Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1, 2),
             mkTopicAssignment(topic2Uuid, 0)
         );
-
         members.put(consumerA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -669,7 +707,6 @@
         Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
             mkTopicAssignment(topic2Uuid, 1)
         );
-
         members.put(consumerB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -681,7 +718,6 @@
             mkTopicAssignment(topic2Uuid, 2),
             mkTopicAssignment(topic3Uuid, 0, 1)
         );
-
         members.put(consumerC, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
@@ -689,21 +725,27 @@
             currentAssignmentForC
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
-        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
+            HETEROGENEOUS,
+            invertedTargetAssignment(members)
+        );
+        SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
         expectedAssignment.put(consumerA, mkAssignment(
             mkTopicAssignment(topic1Uuid, 0, 1)
         ));
-
         expectedAssignment.put(consumerB, mkAssignment(
             mkTopicAssignment(topic1Uuid, 2),
             mkTopicAssignment(topic2Uuid, 0, 1),
             mkTopicAssignment(topic3Uuid, 0, 1)
         ));
-
         expectedAssignment.put(consumerC, mkAssignment(
             mkTopicAssignment(topic2Uuid, 2)
         ));
@@ -711,7 +753,10 @@
         assertAssignment(expectedAssignment, computedAssignment);
     }
 
-    private void assertAssignment(Map<String, Map<Uuid, Set<Integer>>> expectedAssignment, GroupAssignment computedGroupAssignment) {
+    private void assertAssignment(
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
+        GroupAssignment computedGroupAssignment
+    ) {
         assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
         for (String memberId : computedGroupAssignment.members().keySet()) {
             Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 4ae14c2..784bb60 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -835,6 +835,90 @@
     }
 
     @Test
+    public void testUpdateInvertedAssignment() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
+        Uuid topicId = Uuid.randomUuid();
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+        // Verify that partition 0 is assigned to member1.
+        assertEquals(
+            mkMap(
+                mkEntry(topicId, mkMap(mkEntry(0, memberId1)))
+            ),
+            consumerGroup.invertedTargetAssignment()
+        );
+
+        // New assignment for member1
+        Assignment newAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(1))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, newAssignment);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1
+        assertEquals(
+            mkMap(
+                mkEntry(topicId, mkMap(mkEntry(1, memberId1)))
+            ),
+            consumerGroup.invertedTargetAssignment()
+        );
+
+        // New assignment for member2 to add partition 1
+        Assignment newAssignment2 = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(1))
+        ));
+        consumerGroup.updateTargetAssignment(memberId2, newAssignment2);
+
+        // Verify that partition 1 is assigned to member2
+        assertEquals(
+            mkMap(
+                mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
+            ),
+            consumerGroup.invertedTargetAssignment()
+        );
+
+        // New assignment for member1 to revoke partition 1 and assign partition 0
+        Assignment newAssignment1 = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, newAssignment1);
+
+        // Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1
+        assertEquals(
+            mkMap(
+                mkEntry(topicId, mkMap(
+                    mkEntry(0, memberId1),
+                    mkEntry(1, memberId2)
+                ))
+            ),
+            consumerGroup.invertedTargetAssignment()
+        );
+
+        // Test remove target assignment for member1
+        consumerGroup.removeTargetAssignment(memberId1);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2
+        assertEquals(
+            mkMap(
+                mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
+            ),
+            consumerGroup.invertedTargetAssignment()
+        );
+    }
+
+    @Test
     public void testMetadataRefreshDeadline() {
         MockTime time = new MockTime();
         ConsumerGroup group = createConsumerGroup("group-foo");
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 3a03a16..d5ba038 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -17,9 +17,10 @@
 package org.apache.kafka.coordinator.group.consumer;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.AssignmentTestUtil;
 import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
 import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -207,8 +208,11 @@
             SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
             SubscriptionType subscriptionType = HOMOGENEOUS;
 
+            // Prepare the member assignments per topic partition.
+            Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignmentTestUtil.invertedTargetAssignment(memberSpecs);
+
             // Prepare the expected assignment spec.
-            AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs, subscriptionType);
+            GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, subscriptionType, invertedTargetAssignment);
 
             // We use `any` here to always return an assignment but use `verify` later on
             // to ensure that the input was correct.
@@ -222,6 +226,7 @@
                 .withSubscriptionMetadata(subscriptionMetadata)
                 .withSubscriptionType(subscriptionType)
                 .withTargetAssignment(targetAssignment)
+                .withInvertedTargetAssignment(invertedTargetAssignment)
                 .withTopicsImage(topicsImage);
 
             // Add the updated members or delete the deleted members.
@@ -239,7 +244,7 @@
             // Verify that the assignor was called once with the expected
             // assignment spec.
             verify(assignor, times(1))
-                .assign(assignmentSpec, subscribedTopicMetadata);
+                .assign(groupSpec, subscribedTopicMetadata);
 
             return result;
         }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
new file mode 100644
index 0000000..2d2edf2
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.image.MetadataDelta;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class AssignorBenchmarkUtils {
+    /**
+     * Generate a reverse look up map of partition to member target assignments from the given member spec.
+     *
+     * @param groupAssignment       The group assignment.
+     * @return Map of topic partition to member assignments.
+     */
+    public static Map<Uuid, Map<Integer, String>> computeInvertedTargetAssignment(
+        GroupAssignment groupAssignment
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
+        for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
+
+                for (Integer partitionId : partitions) {
+                    partitionMap.put(partitionId, memberId);
+                }
+            }
+        }
+        return invertedTargetAssignment;
+    }
+
+    public static void addTopic(
+        MetadataDelta delta,
+        Uuid topicId,
+        String topicName,
+        int numPartitions
+    ) {
+        // For testing purposes, the following criteria are used:
+        // - Number of replicas for each partition: 2
+        // - Number of brokers available in the cluster: 4
+        delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
+        for (int i = 0; i < numPartitions; i++) {
+            delta.replay(new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(i)
+                .setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
+        }
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index eb9c4ee..2349350 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -18,7 +18,7 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -29,10 +29,12 @@
 import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
 import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.image.TopicsImage;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -119,7 +121,7 @@
 
     private static final int MAX_BUCKET_COUNT = 5;
 
-    private AssignmentSpec assignmentSpec;
+    private GroupSpecImpl groupSpec;
 
     private SubscribedTopicDescriber subscribedTopicDescriber;
 
@@ -160,7 +162,13 @@
                 partitionsPerTopicCount,
                 partitionRacks
             ));
-            TargetAssignmentBuilderBenchmark.addTopic(delta, topicUuid, topicName, partitionsPerTopicCount);
+
+            AssignorBenchmarkUtils.addTopic(
+                delta,
+                topicUuid,
+                topicName,
+                partitionsPerTopicCount
+            );
         }
 
         topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
@@ -207,7 +215,7 @@
             }
         }
 
-        this.assignmentSpec = new AssignmentSpec(members, subscriptionType);
+        this.groupSpec = new GroupSpecImpl(members, subscriptionType, Collections.emptyMap());
     }
 
     private Optional<String> rackId(int memberIndex) {
@@ -243,16 +251,23 @@
     }
 
     private void simulateIncrementalRebalance() {
-        GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+        GroupAssignment initialAssignment = partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
         Map<String, MemberAssignment> members = initialAssignment.members();
 
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
+
         Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
-        members.forEach((memberId, memberAssignment) -> {
-            AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId);
+
+        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+            MemberAssignment memberAssignment = members.getOrDefault(
+                memberId,
+                new MemberAssignment(Collections.emptyMap())
+            );
+
             updatedMembers.put(memberId, new AssignmentMemberSpec(
-                memberSpec.instanceId(),
-                memberSpec.rackId(),
-                memberSpec.subscribedTopicIds(),
+                assignmentMemberSpec.instanceId(),
+                assignmentMemberSpec.rackId(),
+                assignmentMemberSpec.subscribedTopicIds(),
                 memberAssignment.targetPartitions()
             ));
         });
@@ -272,13 +287,13 @@
             Collections.emptyMap()
         ));
 
-        assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType);
+        groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment);
     }
 
     @Benchmark
     @Threads(1)
     @OutputTimeUnit(TimeUnit.MILLISECONDS)
     public void doAssignment() {
-        partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+        partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
     }
 }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 3244f7e..511db01 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -17,10 +17,8 @@
 package org.apache.kafka.jmh.assignor;
 
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
 import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -36,6 +34,7 @@
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.image.TopicsImage;
+
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -51,7 +50,6 @@
 import org.openjdk.jmh.annotations.Warmup;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -90,7 +88,9 @@
 
     private TargetAssignmentBuilder targetAssignmentBuilder;
 
-    private AssignmentSpec assignmentSpec;
+    private GroupSpecImpl groupSpec;
+
+    private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
 
     private final List<String> allTopicNames = new ArrayList<>();
 
@@ -104,7 +104,7 @@
 
         subscriptionMetadata = generateMockSubscriptionMetadata();
         Map<String, ConsumerGroupMember> members = generateMockMembers();
-        Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignment();
+        Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment();
 
         ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
             .setSubscribedTopicNames(allTopicNames)
@@ -113,8 +113,9 @@
         targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
             .withMembers(members)
             .withSubscriptionMetadata(subscriptionMetadata)
-            .withTargetAssignment(existingTargetAssignment)
             .withSubscriptionType(HOMOGENEOUS)
+            .withTargetAssignment(existingTargetAssignment)
+            .withInvertedTargetAssignment(invertedTargetAssignment)
             .withTopicsImage(topicsImage)
             .addOrUpdateMember(newMember.memberId(), newMember);
     }
@@ -148,14 +149,20 @@
                 Collections.emptyMap()
             );
             subscriptionMetadata.put(topicName, metadata);
-            addTopic(delta, topicId, topicName, partitionsPerTopicCount);
+
+            AssignorBenchmarkUtils.addTopic(
+                delta,
+                topicId,
+                topicName,
+                partitionsPerTopicCount
+            );
         }
 
         topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
         return subscriptionMetadata;
     }
 
-    private Map<String, Assignment> generateMockInitialTargetAssignment() {
+    private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() {
         Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
         subscriptionMetadata.forEach((topicName, topicMetadata) ->
             topicMetadataMap.put(
@@ -167,9 +174,10 @@
         createAssignmentSpec();
 
         GroupAssignment groupAssignment = partitionAssignor.assign(
-            assignmentSpec,
+            groupSpec,
             new SubscribedTopicMetadata(topicMetadataMap)
         );
+        invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
 
         Map<String, Assignment> initialTargetAssignment = new HashMap<>(memberCount);
 
@@ -198,25 +206,7 @@
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
-    }
-
-    public static void addTopic(
-        MetadataDelta delta,
-        Uuid topicId,
-        String topicName,
-        int numPartitions
-    ) {
-        // For testing purposes, the following criteria are used:
-        // - Number of replicas for each partition: 2
-        // - Number of brokers available in the cluster: 4
-        delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
-        for (int i = 0; i < numPartitions; i++) {
-            delta.replay(new PartitionRecord()
-                .setTopicId(topicId)
-                .setPartitionId(i)
-                .setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
-        }
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap());
     }
 
     @Benchmark