KAFKA-16625; Reverse lookup map from topic partitions to members (#15974)
This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5a41140..76a94fd 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1901,6 +1901,7 @@
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
+ .withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java
deleted file mode 100644
index ec65dc6..0000000
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group.assignor;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * The assignment specification for a consumer group.
- */
-public class AssignmentSpec {
- /**
- * The member metadata keyed by member Id.
- */
- private final Map<String, AssignmentMemberSpec> members;
-
- /**
- * The subscription type followed by the group.
- */
- private final SubscriptionType subscriptionType;
-
- public AssignmentSpec(
- Map<String, AssignmentMemberSpec> members,
- SubscriptionType subscriptionType
- ) {
- Objects.requireNonNull(members);
- this.members = members;
- this.subscriptionType = subscriptionType;
- }
-
- /**
- * @return Member metadata keyed by member Id.
- */
- public Map<String, AssignmentMemberSpec> members() {
- return members;
- }
-
- /**
- * @return The group's subscription type.
- */
- public SubscriptionType subscriptionType() {
- return subscriptionType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AssignmentSpec that = (AssignmentSpec) o;
- return subscriptionType == that.subscriptionType &&
- members.equals(that.members);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(members, subscriptionType);
- }
-
- public String toString() {
- return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')';
- }
-}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
index f8d165e..d6f83df 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java
@@ -107,8 +107,8 @@
*/
private final PartitionMovements partitionMovements;
- public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
- this.members = assignmentSpec.members();
+ public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+ this.members = groupSpec.members();
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java
new file mode 100644
index 0000000..296dedb
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+
+/**
+ * The group metadata specifications required to compute the target assignment.
+ */
+public interface GroupSpec {
+ /**
+ * @return Member metadata keyed by member Id.
+ */
+ Map<String, AssignmentMemberSpec> members();
+
+ /**
+ * @return The group's subscription type.
+ */
+ SubscriptionType subscriptionType();
+
+ /**
+ * @return True, if the partition is currently assigned to a member.
+ * False, otherwise.
+ */
+ boolean isPartitionAssigned(Uuid topicId, int partitionId);
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java
new file mode 100644
index 0000000..0194727
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a consumer group.
+ */
+public class GroupSpecImpl implements GroupSpec {
+ /**
+ * The member metadata keyed by member Id.
+ */
+ private final Map<String, AssignmentMemberSpec> members;
+
+ /**
+ * The subscription type followed by the group.
+ */
+ private final SubscriptionType subscriptionType;
+
+ /**
+ * Reverse lookup map representing topic partitions with
+ * their current member assignments.
+ */
+ private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+
+ public GroupSpecImpl(
+ Map<String, AssignmentMemberSpec> members,
+ SubscriptionType subscriptionType,
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+ ) {
+ Objects.requireNonNull(members);
+ Objects.requireNonNull(subscriptionType);
+ Objects.requireNonNull(invertedTargetAssignment);
+ this.members = members;
+ this.subscriptionType = subscriptionType;
+ this.invertedTargetAssignment = invertedTargetAssignment;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, AssignmentMemberSpec> members() {
+ return members;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SubscriptionType subscriptionType() {
+ return subscriptionType;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
+ Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
+ if (partitionMap == null) {
+ return false;
+ }
+ return partitionMap.containsKey(partitionId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GroupSpecImpl that = (GroupSpecImpl) o;
+ return subscriptionType == that.subscriptionType &&
+ members.equals(that.members) &&
+ invertedTargetAssignment.equals(that.invertedTargetAssignment);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = members.hashCode();
+ result = 31 * result + subscriptionType.hashCode();
+ result = 31 * result + invertedTargetAssignment.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "GroupSpecImpl(members=" + members +
+ ", subscriptionType=" + subscriptionType +
+ ", invertedTargetAssignment=" + invertedTargetAssignment +
+ ')';
+ }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
index a143637..3ea1361 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
@@ -59,7 +59,7 @@
/**
* The assignment specification which includes member metadata.
*/
- private final AssignmentSpec assignmentSpec;
+ private final GroupSpec groupSpec;
/**
* The topic and partition metadata describer.
@@ -89,18 +89,19 @@
* The partitions that still need to be assigned.
* Initially this contains all the subscribed topics' partitions.
*/
- private Set<TopicIdPartition> unassignedPartitions;
+ private final Set<TopicIdPartition> unassignedPartitions;
/**
* The target assignment.
*/
private final Map<String, MemberAssignment> targetAssignment;
- OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
- this.assignmentSpec = assignmentSpec;
+ OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+ this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
- this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+ this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
this.potentiallyUnfilledMembers = new HashMap<>();
+ this.unassignedPartitions = new HashSet<>();
this.targetAssignment = new HashMap<>();
}
@@ -108,9 +109,9 @@
* Here's the step-by-step breakdown of the assignment process:
*
* <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
- * <li> Initialize unassigned partitions to all the topic partitions and
- * remove partitions from the list as and when they are assigned.</li>
- * <li> For existing assignments, retain partitions based on the determined quota.</li>
+ * <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
+ * current target assignment.</li>
+ * <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
* <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
* <li> Proceed with a round-robin assignment according to quotas.
* For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
@@ -124,6 +125,9 @@
return new GroupAssignment(Collections.emptyMap());
}
+ // Check if the subscribed topicId is still valid.
+ // Update unassigned partitions based on the current target assignment
+ // and topic metadata.
for (Uuid topicId : subscribedTopicIds) {
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
@@ -131,21 +135,25 @@
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
} else {
+ for (int i = 0; i < partitionCount; i++) {
+ if (!groupSpec.isPartitionAssigned(topicId, i)) {
+ unassignedPartitions.add(new TopicIdPartition(topicId, i));
+ }
+ }
totalPartitionsCount += partitionCount;
}
}
// The minimum required quota that each member needs to meet for a balanced assignment.
// This is the same for all members.
- final int numberOfMembers = assignmentSpec.members().size();
+ final int numberOfMembers = groupSpec.members().size();
final int minQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
- assignmentSpec.members().keySet().forEach(memberId ->
+ groupSpec.members().keySet().forEach(memberId ->
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
));
- unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
unassignedPartitionsRoundRobinAssignment();
@@ -179,7 +187,7 @@
private Map<String, Integer> assignStickyPartitions(int minQuota) {
Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
- assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+ groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
assignmentMemberSpec.assignedPartitions()
);
@@ -198,20 +206,28 @@
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
- unassignedPartitions.remove(topicIdPartition);
});
- // The extra partition is located at the last index from the previous step.
- if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) {
- TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- unassignedPartitions.remove(topicIdPartition);
- remainingMembersToGetAnExtraPartition--;
+ if (remaining < 0) {
+ // The extra partition is located at the last index from the previous step.
+ if (remainingMembersToGetAnExtraPartition > 0) {
+ TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
+ addPartitionToAssignment(
+ targetAssignment,
+ memberId,
+ topicIdPartition.topicId(),
+ topicIdPartition.partitionId()
+ );
+ remainingMembersToGetAnExtraPartition--;
+ }
+ // Any previously owned partitions that weren't retained due to the quotas
+ // are added to the unassigned partitions set.
+ if (retainedPartitionsCount < currentAssignmentSize) {
+ unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
+ retainedPartitionsCount,
+ currentAssignmentSize
+ ));
+ }
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
index 27b38cc..13b0ee3 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java
@@ -34,12 +34,12 @@
/**
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
- * @param assignmentSpec The assignment spec which includes member metadata.
+ * @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(
- AssignmentSpec assignmentSpec,
+ GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException;
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
index 00fcd7d..482ad02 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java
@@ -19,7 +19,7 @@
import org.apache.kafka.common.errors.ApiException;
/**
- * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception
+ * Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception
* is only used internally.
*/
public class PartitionAssignorException extends ApiException {
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
index a1631f5..8393353 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java
@@ -81,20 +81,20 @@
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
- * @param assignmentSpec The specification for member assignments.
+ * @param groupSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, Collection<String>> membersPerTopic(
- final AssignmentSpec assignmentSpec,
+ final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
- Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
+ Map<String, AssignmentMemberSpec> membersData = groupSpec.members();
- if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
+ if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
Set<String> allMembers = membersData.keySet();
Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();
@@ -139,7 +139,7 @@
*/
@Override
public GroupAssignment assign(
- final AssignmentSpec assignmentSpec,
+ final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
@@ -147,7 +147,7 @@
// Step 1
Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
- assignmentSpec,
+ groupSpec,
subscribedTopicDescriber
);
@@ -162,7 +162,7 @@
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();
for (String memberId : membersForTopic) {
- Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId)
+ Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
.assignedPartitions().getOrDefault(topicId, Collections.emptySet());
int currentAssignmentSize = assignedPartitionsForTopic.size();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
index aff9365..caa0de4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
@@ -57,28 +57,28 @@
* Perform the group assignment given the current members and
* topics metadata.
*
- * @param assignmentSpec The assignment specification that included member metadata.
+ * @param groupSpec The assignment specification that included member metadata.
* @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}.
* @return The new target assignment for the group.
*/
@Override
public GroupAssignment assign(
- AssignmentSpec assignmentSpec,
+ GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
AbstractUniformAssignmentBuilder assignmentBuilder;
- if (assignmentSpec.members().isEmpty())
+ if (groupSpec.members().isEmpty())
return new GroupAssignment(Collections.emptyMap());
- if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
+ if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm");
- assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+ assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
} else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm");
- assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+ assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
}
return assignmentBuilder.buildAssignment();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index dd3a6f2..007971a 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -171,6 +171,12 @@
private final TimelineHashMap<String, Assignment> targetAssignment;
/**
+ * Reverse lookup map representing topic partitions with
+ * their current member assignments.
+ */
+ private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> invertedTargetAssignment;
+
+ /**
* The current partition epoch maps each topic-partitions to their current epoch where
* the epoch is the epoch of their owners. When a member revokes a partition, it removes
* its epochs from this map. When a member gets a partition, it adds its epochs to this map.
@@ -221,6 +227,7 @@
this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
this.metrics = Objects.requireNonNull(metrics);
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
@@ -517,21 +524,89 @@
}
/**
- * Updates target assignment of a member.
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+ public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
+ return Collections.unmodifiableMap(invertedTargetAssignment);
+ }
+
+ /**
+ * Updates the target assignment of a member.
*
* @param memberId The member id.
* @param newTargetAssignment The new target assignment.
*/
public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) {
+ updateInvertedTargetAssignment(
+ memberId,
+ targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())),
+ newTargetAssignment
+ );
targetAssignment.put(memberId, newTargetAssignment);
}
/**
+ * Updates the reverse lookup map of the target assignment.
+ *
+ * @param memberId The member Id.
+ * @param oldTargetAssignment The old target assignment.
+ * @param newTargetAssignment The new target assignment.
+ */
+ private void updateInvertedTargetAssignment(
+ String memberId,
+ Assignment oldTargetAssignment,
+ Assignment newTargetAssignment
+ ) {
+ // Combine keys from both old and new assignments.
+ Set<Uuid> allTopicIds = new HashSet<>();
+ allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+ allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+ for (Uuid topicId : allTopicIds) {
+ Set<Integer> oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+ Set<Integer> newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+ TimelineHashMap<Integer, String> topicPartitionAssignment = invertedTargetAssignment.computeIfAbsent(
+ topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size()))
+ );
+
+ // Remove partitions that aren't present in the new assignment only if the partition is currently
+ // still assigned to the member in question.
+ // If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to
+ // remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment.
+ for (Integer partition : oldPartitions) {
+ if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) {
+ topicPartitionAssignment.remove(partition);
+ }
+ }
+
+ // Add partitions that are in the new assignment but not in the old assignment.
+ for (Integer partition : newPartitions) {
+ if (!oldPartitions.contains(partition)) {
+ topicPartitionAssignment.put(partition, memberId);
+ }
+ }
+
+ if (topicPartitionAssignment.isEmpty()) {
+ invertedTargetAssignment.remove(topicId);
+ } else {
+ invertedTargetAssignment.put(topicId, topicPartitionAssignment);
+ }
+ }
+ }
+
+ /**
* Removes the target assignment of a member.
*
* @param memberId The member id.
*/
public void removeTargetAssignment(String memberId) {
+ updateInvertedTargetAssignment(
+ memberId,
+ targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+ Assignment.EMPTY
+ );
targetAssignment.remove(memberId);
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index 09a44b1..57d6039 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -19,7 +19,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -127,6 +127,12 @@
private Map<String, Assignment> targetAssignment = Collections.emptyMap();
/**
+ * Reverse lookup map representing topic partitions with
+ * their current member assignments.
+ */
+ private Map<Uuid, Map<Integer, String>> invertedTargetAssignment = Collections.emptyMap();
+
+ /**
* The topics image.
*/
private TopicsImage topicsImage = TopicsImage.EMPTY;
@@ -225,6 +231,19 @@
}
/**
+ * Adds the existing topic partition assignments.
+ *
+ * @param invertedTargetAssignment The reverse lookup map of the current target assignment.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withInvertedTargetAssignment(
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment
+ ) {
+ this.invertedTargetAssignment = invertedTargetAssignment;
+ return this;
+ }
+
+ /**
* Adds the topics image.
*
* @param topicsImage The topics image.
@@ -317,7 +336,11 @@
// Compute the assignment.
GroupAssignment newGroupAssignment = assignor.assign(
- new AssignmentSpec(Collections.unmodifiableMap(memberSpecs), subscriptionType),
+ new GroupSpecImpl(
+ Collections.unmodifiableMap(memberSpecs),
+ subscriptionType,
+ invertedTargetAssignment
+ ),
new SubscribedTopicMetadata(topicMetadataMap)
);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 8b6a09b..74bb303 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import java.util.AbstractMap;
@@ -82,4 +83,33 @@
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
});
}
+
+ /**
+ * Generate a reverse look up map of partition to member target assignments from the given member spec.
+ *
+ * @param memberSpec A map where the key is the member Id and the value is an
+ * AssignmentMemberSpec object containing the member's partition assignments.
+ * @return Map of topic partition to member assignments.
+ */
+ public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+ Map<String, AssignmentMemberSpec> memberSpec
+ ) {
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
+ for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) {
+ String memberId = memberEntry.getKey();
+ Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions();
+
+ for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
+ Uuid topicId = topicEntry.getKey();
+ Set<Integer> partitions = topicEntry.getValue();
+
+ Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
+
+ for (Integer partitionId : partitions) {
+ partitionMap.put(partitionId, memberId);
+ }
+ }
+ }
+ return invertedTargetAssignment;
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
index 736ee43..1fdbb31 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java
@@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
@@ -45,7 +45,7 @@
}
@Override
- public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
+ public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
return prepareGroupAssignment;
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
index 05b8cdc..cf15ee5 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -34,8 +34,8 @@
}
@Override
- public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
- return new GroupAssignment(assignmentSpec.members().entrySet()
+ public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
+ return new GroupAssignment(groupSpec.members().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
index 32f9618..f5a7be2 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java
@@ -32,6 +32,7 @@
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -79,8 +80,16 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
- GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
+
+ GroupAssignment groupAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@@ -113,10 +122,14 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
assertThrows(PartitionAssignorException.class,
- () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+ () -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@@ -149,10 +162,17 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -202,10 +222,17 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@@ -285,10 +312,17 @@
currentAssignmentForC
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -362,10 +396,17 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -429,10 +470,17 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -496,10 +544,17 @@
// Member C was removed
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -554,10 +609,17 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java
new file mode 100644
index 0000000..4060b1a
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class GroupSpecImplTest {
+
+ private Map<String, AssignmentMemberSpec> members;
+ private SubscriptionType subscriptionType;
+ private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+ private GroupSpecImpl groupSpec;
+ private Uuid topicId;
+
+ @BeforeEach
+ void setUp() {
+ members = new HashMap<>();
+
+ subscriptionType = SubscriptionType.HOMOGENEOUS;
+ invertedTargetAssignment = new HashMap<>();
+ topicId = Uuid.randomUuid();
+
+ members.put("test-member", new AssignmentMemberSpec(
+ Optional.empty(),
+ Optional.empty(),
+ new HashSet<>(Collections.singletonList(topicId)),
+ Collections.emptyMap())
+ );
+
+ groupSpec = new GroupSpecImpl(
+ members,
+ subscriptionType,
+ invertedTargetAssignment
+ );
+ }
+
+ @Test
+ void testMembers() {
+ assertEquals(members, groupSpec.members());
+ }
+
+ @Test
+ void testSubscriptionType() {
+ assertEquals(subscriptionType, groupSpec.subscriptionType());
+ }
+
+ @Test
+ void testIsPartitionAssigned() {
+ Map<Integer, String> partitionMap = new HashMap<>();
+ partitionMap.put(1, "test-member");
+ invertedTargetAssignment.put(topicId, partitionMap);
+
+ assertTrue(groupSpec.isPartitionAssigned(topicId, 1));
+ assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
+ assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
+ }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index 4fe748c..f21bd63 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -35,6 +35,7 @@
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,8 +78,16 @@
)
);
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+
+ GroupAssignment groupAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@@ -107,10 +116,14 @@
)
);
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
assertThrows(PartitionAssignorException.class,
- () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+ () -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@@ -143,11 +156,6 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
@@ -158,6 +166,18 @@
mkTopicAssignment(topic3Uuid, 0)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -192,11 +212,6 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@@ -209,6 +224,18 @@
Collections.emptyMap()
);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -236,10 +263,17 @@
));
}
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
checkValidityAndBalance(members, computedAssignment);
}
@@ -261,7 +295,6 @@
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
@@ -288,11 +321,6 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
@@ -303,6 +331,18 @@
mkTopicAssignment(topic2Uuid, 1, 2)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -352,11 +392,6 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
@@ -367,6 +402,18 @@
mkTopicAssignment(topic2Uuid, 1, 2, 3)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -419,11 +466,6 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2)
@@ -436,6 +478,18 @@
mkTopicAssignment(topic2Uuid, 0, 2)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -482,11 +536,6 @@
// Member C was removed
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
@@ -497,6 +546,18 @@
mkTopicAssignment(topic2Uuid, 1, 2)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@@ -542,11 +603,6 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic2Uuid, 0)
@@ -555,6 +611,18 @@
mkTopicAssignment(topic2Uuid, 1)
));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
index b52681d..7acb9bd 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
@@ -32,6 +32,7 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,8 +75,16 @@
)
);
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+
+ GroupAssignment groupAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@@ -104,10 +113,14 @@
)
);
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
assertThrows(PartitionAssignorException.class,
- () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+ () -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@@ -126,8 +139,6 @@
createPartitionRacks(2)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@@ -144,16 +155,23 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
@@ -184,8 +202,6 @@
createPartitionRacks(2)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@@ -209,20 +225,26 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic3Uuid, 0)
));
-
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
@@ -247,8 +269,6 @@
createPartitionRacks(2)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@@ -272,8 +292,17 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ Collections.emptyMap()
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
// Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition.
@@ -281,12 +310,10 @@
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic3Uuid, 0)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic3Uuid, 1)
));
-
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 2)
));
@@ -310,15 +337,12 @@
createPartitionRacks(2)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
-
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -330,7 +354,6 @@
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -346,16 +369,23 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
@@ -383,15 +413,12 @@
createPartitionRacks(4)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
-
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -403,7 +430,6 @@
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -411,16 +437,23 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2, 3),
mkTopicAssignment(topic2Uuid, 2, 3)
@@ -445,15 +478,12 @@
createPartitionRacks(3)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
-
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -465,7 +495,6 @@
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -481,21 +510,27 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
-
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
@@ -521,15 +556,12 @@
createPartitionRacks(3)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
-
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -541,7 +573,6 @@
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -557,21 +588,27 @@
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
-
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 3)
));
@@ -595,8 +632,6 @@
createPartitionRacks(3)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
// Consumer A was removed
@@ -604,7 +639,6 @@
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -612,11 +646,19 @@
currentAssignmentForB
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1, 2)
@@ -647,18 +689,14 @@
createPartitionRacks(2)
));
- SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
-
- Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
// Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
+ Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0)
);
-
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -669,7 +707,6 @@
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic2Uuid, 1)
);
-
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -681,7 +718,6 @@
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 0, 1)
);
-
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@@ -689,21 +725,27 @@
currentAssignmentForC
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
- GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HETEROGENEOUS,
+ invertedTargetAssignment(members)
+ );
+ SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
+
+ GroupAssignment computedAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
-
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1)
));
-
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0, 1)
));
-
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2)
));
@@ -711,7 +753,10 @@
assertAssignment(expectedAssignment, computedAssignment);
}
- private void assertAssignment(Map<String, Map<Uuid, Set<Integer>>> expectedAssignment, GroupAssignment computedGroupAssignment) {
+ private void assertAssignment(
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
+ GroupAssignment computedGroupAssignment
+ ) {
assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
for (String memberId : computedGroupAssignment.members().keySet()) {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 4ae14c2..784bb60 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -835,6 +835,90 @@
}
@Test
+ public void testUpdateInvertedAssignment() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+ GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
+ Uuid topicId = Uuid.randomUuid();
+ String memberId1 = "member1";
+ String memberId2 = "member2";
+
+ // Initial assignment for member1
+ Assignment initialAssignment = new Assignment(Collections.singletonMap(
+ topicId,
+ new HashSet<>(Collections.singletonList(0))
+ ));
+ consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+ // Verify that partition 0 is assigned to member1.
+ assertEquals(
+ mkMap(
+ mkEntry(topicId, mkMap(mkEntry(0, memberId1)))
+ ),
+ consumerGroup.invertedTargetAssignment()
+ );
+
+ // New assignment for member1
+ Assignment newAssignment = new Assignment(Collections.singletonMap(
+ topicId,
+ new HashSet<>(Collections.singletonList(1))
+ ));
+ consumerGroup.updateTargetAssignment(memberId1, newAssignment);
+
+ // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1
+ assertEquals(
+ mkMap(
+ mkEntry(topicId, mkMap(mkEntry(1, memberId1)))
+ ),
+ consumerGroup.invertedTargetAssignment()
+ );
+
+ // New assignment for member2 to add partition 1
+ Assignment newAssignment2 = new Assignment(Collections.singletonMap(
+ topicId,
+ new HashSet<>(Collections.singletonList(1))
+ ));
+ consumerGroup.updateTargetAssignment(memberId2, newAssignment2);
+
+ // Verify that partition 1 is assigned to member2
+ assertEquals(
+ mkMap(
+ mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
+ ),
+ consumerGroup.invertedTargetAssignment()
+ );
+
+ // New assignment for member1 to revoke partition 1 and assign partition 0
+ Assignment newAssignment1 = new Assignment(Collections.singletonMap(
+ topicId,
+ new HashSet<>(Collections.singletonList(0))
+ ));
+ consumerGroup.updateTargetAssignment(memberId1, newAssignment1);
+
+ // Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1
+ assertEquals(
+ mkMap(
+ mkEntry(topicId, mkMap(
+ mkEntry(0, memberId1),
+ mkEntry(1, memberId2)
+ ))
+ ),
+ consumerGroup.invertedTargetAssignment()
+ );
+
+ // Test remove target assignment for member1
+ consumerGroup.removeTargetAssignment(memberId1);
+
+ // Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2
+ assertEquals(
+ mkMap(
+ mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
+ ),
+ consumerGroup.invertedTargetAssignment()
+ );
+ }
+
+ @Test
public void testMetadataRefreshDeadline() {
MockTime time = new MockTime();
ConsumerGroup group = createConsumerGroup("group-foo");
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 3a03a16..d5ba038 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -17,9 +17,10 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -207,8 +208,11 @@
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
SubscriptionType subscriptionType = HOMOGENEOUS;
+ // Prepare the member assignments per topic partition.
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignmentTestUtil.invertedTargetAssignment(memberSpecs);
+
// Prepare the expected assignment spec.
- AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs, subscriptionType);
+ GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, subscriptionType, invertedTargetAssignment);
// We use `any` here to always return an assignment but use `verify` later on
// to ensure that the input was correct.
@@ -222,6 +226,7 @@
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
+ .withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage);
// Add the updated members or delete the deleted members.
@@ -239,7 +244,7 @@
// Verify that the assignor was called once with the expected
// assignment spec.
verify(assignor, times(1))
- .assign(assignmentSpec, subscribedTopicMetadata);
+ .assign(groupSpec, subscribedTopicMetadata);
return result;
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
new file mode 100644
index 0000000..2d2edf2
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.image.MetadataDelta;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class AssignorBenchmarkUtils {
+ /**
+ * Generate a reverse look up map of partition to member target assignments from the given member spec.
+ *
+ * @param groupAssignment The group assignment.
+ * @return Map of topic partition to member assignments.
+ */
+ public static Map<Uuid, Map<Integer, String>> computeInvertedTargetAssignment(
+ GroupAssignment groupAssignment
+ ) {
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
+ for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) {
+ String memberId = memberEntry.getKey();
+ Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions();
+
+ for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
+ Uuid topicId = topicEntry.getKey();
+ Set<Integer> partitions = topicEntry.getValue();
+
+ Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
+
+ for (Integer partitionId : partitions) {
+ partitionMap.put(partitionId, memberId);
+ }
+ }
+ }
+ return invertedTargetAssignment;
+ }
+
+ public static void addTopic(
+ MetadataDelta delta,
+ Uuid topicId,
+ String topicName,
+ int numPartitions
+ ) {
+ // For testing purposes, the following criteria are used:
+ // - Number of replicas for each partition: 2
+ // - Number of brokers available in the cluster: 4
+ delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
+ for (int i = 0; i < numPartitions; i++) {
+ delta.replay(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(i)
+ .setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
+ }
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index eb9c4ee..2349350 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -18,7 +18,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -29,10 +29,12 @@
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -119,7 +121,7 @@
private static final int MAX_BUCKET_COUNT = 5;
- private AssignmentSpec assignmentSpec;
+ private GroupSpecImpl groupSpec;
private SubscribedTopicDescriber subscribedTopicDescriber;
@@ -160,7 +162,13 @@
partitionsPerTopicCount,
partitionRacks
));
- TargetAssignmentBuilderBenchmark.addTopic(delta, topicUuid, topicName, partitionsPerTopicCount);
+
+ AssignorBenchmarkUtils.addTopic(
+ delta,
+ topicUuid,
+ topicName,
+ partitionsPerTopicCount
+ );
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
@@ -207,7 +215,7 @@
}
}
- this.assignmentSpec = new AssignmentSpec(members, subscriptionType);
+ this.groupSpec = new GroupSpecImpl(members, subscriptionType, Collections.emptyMap());
}
private Optional<String> rackId(int memberIndex) {
@@ -243,16 +251,23 @@
}
private void simulateIncrementalRebalance() {
- GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+ GroupAssignment initialAssignment = partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
Map<String, MemberAssignment> members = initialAssignment.members();
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
+
Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
- members.forEach((memberId, memberAssignment) -> {
- AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId);
+
+ groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+ MemberAssignment memberAssignment = members.getOrDefault(
+ memberId,
+ new MemberAssignment(Collections.emptyMap())
+ );
+
updatedMembers.put(memberId, new AssignmentMemberSpec(
- memberSpec.instanceId(),
- memberSpec.rackId(),
- memberSpec.subscribedTopicIds(),
+ assignmentMemberSpec.instanceId(),
+ assignmentMemberSpec.rackId(),
+ assignmentMemberSpec.subscribedTopicIds(),
memberAssignment.targetPartitions()
));
});
@@ -272,13 +287,13 @@
Collections.emptyMap()
));
- assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType);
+ groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
- partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+ partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
}
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 3244f7e..511db01 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -17,10 +17,8 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
-import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@@ -36,6 +34,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -51,7 +50,6 @@
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -90,7 +88,9 @@
private TargetAssignmentBuilder targetAssignmentBuilder;
- private AssignmentSpec assignmentSpec;
+ private GroupSpecImpl groupSpec;
+
+ private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private final List<String> allTopicNames = new ArrayList<>();
@@ -104,7 +104,7 @@
subscriptionMetadata = generateMockSubscriptionMetadata();
Map<String, ConsumerGroupMember> members = generateMockMembers();
- Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignment();
+ Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment();
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
.setSubscribedTopicNames(allTopicNames)
@@ -113,8 +113,9 @@
targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
- .withTargetAssignment(existingTargetAssignment)
.withSubscriptionType(HOMOGENEOUS)
+ .withTargetAssignment(existingTargetAssignment)
+ .withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.addOrUpdateMember(newMember.memberId(), newMember);
}
@@ -148,14 +149,20 @@
Collections.emptyMap()
);
subscriptionMetadata.put(topicName, metadata);
- addTopic(delta, topicId, topicName, partitionsPerTopicCount);
+
+ AssignorBenchmarkUtils.addTopic(
+ delta,
+ topicId,
+ topicName,
+ partitionsPerTopicCount
+ );
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
return subscriptionMetadata;
}
- private Map<String, Assignment> generateMockInitialTargetAssignment() {
+ private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
@@ -167,9 +174,10 @@
createAssignmentSpec();
GroupAssignment groupAssignment = partitionAssignor.assign(
- assignmentSpec,
+ groupSpec,
new SubscribedTopicMetadata(topicMetadataMap)
);
+ invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
Map<String, Assignment> initialTargetAssignment = new HashMap<>(memberCount);
@@ -198,25 +206,7 @@
Collections.emptyMap()
));
}
- assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
- }
-
- public static void addTopic(
- MetadataDelta delta,
- Uuid topicId,
- String topicName,
- int numPartitions
- ) {
- // For testing purposes, the following criteria are used:
- // - Number of replicas for each partition: 2
- // - Number of brokers available in the cluster: 4
- delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
- for (int i = 0; i < numPartitions; i++) {
- delta.replay(new PartitionRecord()
- .setTopicId(topicId)
- .setPartitionId(i)
- .setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
- }
+ groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap());
}
@Benchmark