MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group (#15847)
While reviewing https://github.com/apache/kafka/pull/15785, I noticed that the member is added to the group directly in `ConsumerGroup#getOrMaybeCreateMember`. This does not hurt but confuses people because the state must not be mutated at this point. It should only be mutated when records are replayed. I think that it is better to remove it in order to make it clear.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 952dd43..20b4428 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -328,7 +328,8 @@
}
/**
- * Gets or creates a member.
+ * Gets or creates a new member but without adding it to the group. Adding a member
+ * is done via the {@link ConsumerGroup#updateMember(ConsumerGroupMember)} method.
*
* @param memberId The member id.
* @param createIfNotExists Booleans indicating whether the member must be
@@ -341,16 +342,15 @@
boolean createIfNotExists
) {
ConsumerGroupMember member = members.get(memberId);
- if (member == null) {
- if (!createIfNotExists) {
- throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
- memberId, groupId));
- }
- member = new ConsumerGroupMember.Builder(memberId).build();
- members.put(memberId, member);
+ if (member != null) return member;
+
+ if (!createIfNotExists) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s is not a member of group %s.", memberId, groupId)
+ );
}
- return member;
+ return new ConsumerGroupMember.Builder(memberId).build();
}
/**
@@ -366,7 +366,7 @@
}
/**
- * Updates the member.
+ * Adds or updates the member.
*
* @param newMember The new member state.
*/
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index b7d95fe..d85ab46 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -252,14 +252,16 @@
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
- ConsumerGroupMember member = members.get(memberId);
- Assignment assignment;
+ Assignment assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
// A new static member joins and needs to replace an existing departed one.
- if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
- assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY);
- } else {
- assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+ if (updatedMemberOrNull.instanceId() != null) {
+ String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId());
+ if (previousMemberId != null && !previousMemberId.equals(memberId)) {
+ assignment = targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);
+ }
}
+
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment,
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 4b3d76b..cd5e0a0 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -2125,7 +2125,6 @@
// the transaction is committed.
context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
-
// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for
// foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1 and bar-0.
assertEquals(Arrays.asList(
@@ -2184,7 +2183,7 @@
// Create consumer group.
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
// Create member.
- group.getOrMaybeCreateMember("member", true);
+ group.updateMember(new ConsumerGroupMember.Builder("member").build());
// Commit offset.
context.commitOffset("group", "foo", 0, 100L, 1);
@@ -2277,7 +2276,7 @@
public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
- group.getOrMaybeCreateMember("member", true);
+ group.updateMember(new ConsumerGroupMember.Builder("member").build());
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index e265d15..d0a75d9 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -78,11 +78,14 @@
ConsumerGroup consumerGroup = createConsumerGroup("foo");
ConsumerGroupMember member;
- // Create a group.
+ // Create a member.
member = consumerGroup.getOrMaybeCreateMember("member-id", true);
assertEquals("member-id", member.memberId());
- // Get that group back.
+ // Add member to the group.
+ consumerGroup.updateMember(member);
+
+ // Get that member back.
member = consumerGroup.getOrMaybeCreateMember("member-id", false);
assertEquals("member-id", member.memberId());
@@ -138,7 +141,8 @@
public void testRemoveMember() {
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- consumerGroup.getOrMaybeCreateMember("member", true);
+ ConsumerGroupMember member = consumerGroup.getOrMaybeCreateMember("member", true);
+ consumerGroup.updateMember(member);
assertTrue(consumerGroup.hasMember("member"));
consumerGroup.removeMember("member");
@@ -150,16 +154,13 @@
public void testRemoveStaticMember() {
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- ConsumerGroupMember member;
- member = consumerGroup.getOrMaybeCreateMember("member", true);
- assertTrue(consumerGroup.hasMember("member"));
-
- member = new ConsumerGroupMember.Builder(member)
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setInstanceId("instance")
.build();
consumerGroup.updateMember(member);
+ assertTrue(consumerGroup.hasMember("member"));
consumerGroup.removeMember("member");
assertFalse(consumerGroup.hasMember("member"));
@@ -801,7 +802,7 @@
group.validateOffsetCommit("member-id", null, 0, isTransactional));
// Create a member.
- group.getOrMaybeCreateMember("member-id", true);
+ group.updateMember(new ConsumerGroupMember.Builder("member-id").build());
// A call from the admin client should fail as the group is not empty.
assertThrows(UnknownMemberIdException.class, () ->
@@ -852,7 +853,7 @@
// Create a member.
snapshotRegistry.getOrCreateSnapshot(0);
- group.getOrMaybeCreateMember("member-id", true);
+ group.updateMember(new ConsumerGroupMember.Builder("member-id").build());
// The member does not exist at last committed offset 0.
assertThrows(UnknownMemberIdException.class, () ->
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 2a73e6f..9b6a1e8 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -143,17 +143,7 @@
public void removeMemberSubscription(
String memberId
) {
- removeMemberSubscription(memberId, null);
- }
-
- public void removeMemberSubscription(
- String memberId,
- String instanceId
- ) {
this.updatedMembers.put(memberId, null);
- if (instanceId != null) {
- this.staticMembers.remove(instanceId);
- }
}
public void prepareMemberAssignment(
@@ -182,14 +172,16 @@
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
- ConsumerGroupMember member = members.get(memberId);
- Assignment assignment;
+ Assignment assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
// A new static member joins and needs to replace an existing departed one.
- if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
- assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY);
- } else {
- assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+ if (updatedMemberOrNull.instanceId() != null) {
+ String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId());
+ if (previousMemberId != null && !previousMemberId.equals(memberId)) {
+ assignment = targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);
+ }
}
+
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment,
@@ -214,7 +206,6 @@
when(assignor.assign(any(), any()))
.thenReturn(new GroupAssignment(memberAssignments));
-
// Create and populate the assignment builder.
TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor)
.withMembers(members)
@@ -722,7 +713,7 @@
}
@Test
- public void testStaticMemberReplace() {
+ public void testReplaceStaticMember() {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
@@ -731,26 +722,26 @@
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
- context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+ context.addGroupMember("member-1", "instance-member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
));
- context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+ context.addGroupMember("member-2", "instance-member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
));
- context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+ context.addGroupMember("member-3", "instance-member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
));
// Static member 3 leaves
- context.removeMemberSubscription("member-3", "member-3");
+ context.removeMemberSubscription("member-3");
// Another static member joins with the same instance id as the departed one
- context.updateMemberSubscription("member-3-a", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty());
+ context.updateMemberSubscription("member-3-a", Arrays.asList("foo", "bar", "zar"), Optional.of("instance-member-3"), Optional.empty());
context.prepareMemberAssignment("member-1", mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),