MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group (#15847)

While reviewing https://github.com/apache/kafka/pull/15785, I noticed that the member is added to the group directly in `ConsumerGroup#getOrMaybeCreateMember`. This does not hurt but confuses people because the state must not be mutated at this point. It should only be mutated when records are replayed. I think that it is better to remove it in order to make it clear.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 952dd43..20b4428 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -328,7 +328,8 @@
     }
 
     /**
-     * Gets or creates a member.
+     * Gets or creates a new member but without adding it to the group. Adding a member
+     * is done via the {@link ConsumerGroup#updateMember(ConsumerGroupMember)} method.
      *
      * @param memberId          The member id.
      * @param createIfNotExists Booleans indicating whether the member must be
@@ -341,16 +342,15 @@
         boolean createIfNotExists
     ) {
         ConsumerGroupMember member = members.get(memberId);
-        if (member == null) {
-            if (!createIfNotExists) {
-                throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
-                    memberId, groupId));
-            }
-            member = new ConsumerGroupMember.Builder(memberId).build();
-            members.put(memberId, member);
+        if (member != null) return member;
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", memberId, groupId)
+            );
         }
 
-        return member;
+        return new ConsumerGroupMember.Builder(memberId).build();
     }
 
     /**
@@ -366,7 +366,7 @@
     }
 
     /**
-     * Updates the member.
+     * Adds or updates the member.
      *
      * @param newMember The new member state.
      */
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index b7d95fe..d85ab46 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -252,14 +252,16 @@
             if (updatedMemberOrNull == null) {
                 memberSpecs.remove(memberId);
             } else {
-                ConsumerGroupMember member = members.get(memberId);
-                Assignment assignment;
+                Assignment assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
                 // A new static member joins and needs to replace an existing departed one.
-                if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
-                    assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY);
-                } else {
-                    assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+                if (updatedMemberOrNull.instanceId() != null) {
+                    String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId());
+                    if (previousMemberId != null && !previousMemberId.equals(memberId)) {
+                        assignment = targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);
+                    }
                 }
+
                 memberSpecs.put(memberId, createAssignmentMemberSpec(
                     updatedMemberOrNull,
                     assignment,
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 4b3d76b..cd5e0a0 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -2125,7 +2125,6 @@
         // the transaction is committed.
         context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
 
-
         // Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for
         // foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1 and bar-0.
         assertEquals(Arrays.asList(
@@ -2184,7 +2183,7 @@
         // Create consumer group.
         ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
         // Create member.
-        group.getOrMaybeCreateMember("member", true);
+        group.updateMember(new ConsumerGroupMember.Builder("member").build());
         // Commit offset.
         context.commitOffset("group", "foo", 0, 100L, 1);
 
@@ -2277,7 +2276,7 @@
     public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
         ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
-        group.getOrMaybeCreateMember("member", true);
+        group.updateMember(new ConsumerGroupMember.Builder("member").build());
 
         // Fetch offsets case.
         List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index e265d15..d0a75d9 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -78,11 +78,14 @@
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
         ConsumerGroupMember member;
 
-        // Create a group.
+        // Create a member.
         member = consumerGroup.getOrMaybeCreateMember("member-id", true);
         assertEquals("member-id", member.memberId());
 
-        // Get that group back.
+        // Add member to the group.
+        consumerGroup.updateMember(member);
+
+        // Get that member back.
         member = consumerGroup.getOrMaybeCreateMember("member-id", false);
         assertEquals("member-id", member.memberId());
 
@@ -138,7 +141,8 @@
     public void testRemoveMember() {
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
 
-        consumerGroup.getOrMaybeCreateMember("member", true);
+        ConsumerGroupMember member = consumerGroup.getOrMaybeCreateMember("member", true);
+        consumerGroup.updateMember(member);
         assertTrue(consumerGroup.hasMember("member"));
 
         consumerGroup.removeMember("member");
@@ -150,16 +154,13 @@
     public void testRemoveStaticMember() {
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
 
-        ConsumerGroupMember member;
-        member = consumerGroup.getOrMaybeCreateMember("member", true);
-        assertTrue(consumerGroup.hasMember("member"));
-
-        member = new ConsumerGroupMember.Builder(member)
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
             .setInstanceId("instance")
             .build();
 
         consumerGroup.updateMember(member);
+        assertTrue(consumerGroup.hasMember("member"));
 
         consumerGroup.removeMember("member");
         assertFalse(consumerGroup.hasMember("member"));
@@ -801,7 +802,7 @@
             group.validateOffsetCommit("member-id", null, 0, isTransactional));
 
         // Create a member.
-        group.getOrMaybeCreateMember("member-id", true);
+        group.updateMember(new ConsumerGroupMember.Builder("member-id").build());
 
         // A call from the admin client should fail as the group is not empty.
         assertThrows(UnknownMemberIdException.class, () ->
@@ -852,7 +853,7 @@
 
         // Create a member.
         snapshotRegistry.getOrCreateSnapshot(0);
-        group.getOrMaybeCreateMember("member-id", true);
+        group.updateMember(new ConsumerGroupMember.Builder("member-id").build());
 
         // The member does not exist at last committed offset 0.
         assertThrows(UnknownMemberIdException.class, () ->
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 2a73e6f..9b6a1e8 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -143,17 +143,7 @@
         public void removeMemberSubscription(
             String memberId
         ) {
-            removeMemberSubscription(memberId, null);
-        }
-
-        public void removeMemberSubscription(
-            String memberId,
-            String instanceId
-        ) {
             this.updatedMembers.put(memberId, null);
-            if (instanceId != null) {
-                this.staticMembers.remove(instanceId);
-            }
         }
 
         public void prepareMemberAssignment(
@@ -182,14 +172,16 @@
                 if (updatedMemberOrNull == null) {
                     memberSpecs.remove(memberId);
                 } else {
-                    ConsumerGroupMember member = members.get(memberId);
-                    Assignment assignment;
+                    Assignment assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+
                     // A new static member joins and needs to replace an existing departed one.
-                    if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
-                        assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY);
-                    } else {
-                        assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+                    if (updatedMemberOrNull.instanceId() != null) {
+                        String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId());
+                        if (previousMemberId != null && !previousMemberId.equals(memberId)) {
+                            assignment = targetAssignment.getOrDefault(previousMemberId, Assignment.EMPTY);
+                        }
                     }
+
                     memberSpecs.put(memberId, createAssignmentMemberSpec(
                         updatedMemberOrNull,
                         assignment,
@@ -214,7 +206,6 @@
             when(assignor.assign(any(), any()))
                 .thenReturn(new GroupAssignment(memberAssignments));
 
-
             // Create and populate the assignment builder.
             TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor)
                 .withMembers(members)
@@ -722,7 +713,7 @@
     }
 
     @Test
-    public void testStaticMemberReplace() {
+    public void testReplaceStaticMember() {
         TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
             "my-group",
             20
@@ -731,26 +722,26 @@
         Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
         Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
 
-        context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+        context.addGroupMember("member-1", "instance-member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2),
             mkTopicAssignment(barTopicId, 1, 2)
         ));
 
-        context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+        context.addGroupMember("member-2", "instance-member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
             mkTopicAssignment(fooTopicId, 3, 4),
             mkTopicAssignment(barTopicId, 3, 4)
         ));
 
-        context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment(
+        context.addGroupMember("member-3", "instance-member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment(
             mkTopicAssignment(fooTopicId, 5, 6),
             mkTopicAssignment(barTopicId, 5, 6)
         ));
 
         // Static member 3 leaves
-        context.removeMemberSubscription("member-3", "member-3");
+        context.removeMemberSubscription("member-3");
 
         // Another static member joins with the same instance id as the departed one
-        context.updateMemberSubscription("member-3-a", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty());
+        context.updateMemberSubscription("member-3-a", Arrays.asList("foo", "bar", "zar"), Optional.of("instance-member-3"), Optional.empty());
 
         context.prepareMemberAssignment("member-1", mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2),