MINOR: Remove metrics attribute from ConsumerGroup (#20542)

The `metrics` attribute in `ConsumerGroup` is not used anymore. This
patch removes it.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Dongnuo Lyu
 <dlyu@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f87af48..477e049 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -813,10 +813,10 @@
         }
 
         if (group == null) {
-            return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            return new ConsumerGroup(snapshotRegistry, groupId);
         } else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) {
             log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId);
-            return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            return new ConsumerGroup(snapshotRegistry, groupId);
         } else {
             if (group.type() == CONSUMER) {
                 return (ConsumerGroup) group;
@@ -980,7 +980,7 @@
         }
 
         if (group == null) {
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else if (group.type() == CONSUMER) {
@@ -990,7 +990,7 @@
             // offsets if no group existed. Simple classic groups are not backed by any records
             // in the __consumer_offsets topic hence we can safely replace it here. Without this,
             // replaying consumer group records after offset commit records would not work.
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else {
@@ -1364,7 +1364,6 @@
         try {
             consumerGroup = ConsumerGroup.fromClassicGroup(
                 snapshotRegistry,
-                metrics,
                 classicGroup,
                 topicHashCache,
                 metadataImage
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 19b103c..2db94cb 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -38,7 +38,6 @@
 import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.classic.ClassicGroup;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.ModernGroup;
@@ -120,11 +119,6 @@
     private final TimelineHashMap<String, Integer> serverAssignors;
 
     /**
-     * The coordinator metrics.
-     */
-    private final GroupCoordinatorMetricsShard metrics;
-
-    /**
      * The number of members that use the classic protocol.
      */
     private final TimelineInteger numClassicProtocolMembers;
@@ -155,14 +149,12 @@
 
     public ConsumerGroup(
         SnapshotRegistry snapshotRegistry,
-        String groupId,
-        GroupCoordinatorMetricsShard metrics
+        String groupId
     ) {
         super(snapshotRegistry, groupId);
         this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.metrics = Objects.requireNonNull(metrics);
         this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
         this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
         this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1130,7 +1122,6 @@
      * Create a new consumer group according to the given classic group.
      *
      * @param snapshotRegistry  The SnapshotRegistry.
-     * @param metrics           The GroupCoordinatorMetricsShard.
      * @param classicGroup      The converted classic group.
      * @param topicHashCache    The cache for topic hashes.
      * @param metadataImage     The current metadata image for the Kafka cluster.
@@ -1141,13 +1132,12 @@
      */
     public static ConsumerGroup fromClassicGroup(
         SnapshotRegistry snapshotRegistry,
-        GroupCoordinatorMetricsShard metrics,
         ClassicGroup classicGroup,
         Map<String, Long> topicHashCache,
         CoordinatorMetadataImage metadataImage
     ) {
         String groupId = classicGroup.groupId();
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
         consumerGroup.setGroupEpoch(classicGroup.generationId());
         consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
 
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9a7d382..145b761 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -84,7 +84,6 @@
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
@@ -1384,10 +1383,9 @@
         ArgumentCaptor<List<CoordinatorRecord>> recordsCapture = ArgumentCaptor.forClass(List.class);
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
 
-        ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id", metricsShard);
-        ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id", metricsShard);
+        ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id");
+        ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id");
 
         when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id", "other-group-id"));
         when(groupMetadataManager.group("group-id")).thenReturn(group1);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 2e6cf00..24212d1 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -45,7 +45,6 @@
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
@@ -81,7 +80,6 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
 
 public class ClassicGroupTest {
     private final String protocolType = "consumer";
@@ -1383,8 +1381,7 @@
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
             new SnapshotRegistry(logContext),
-            groupId,
-            mock(GroupCoordinatorMetricsShard.class)
+            groupId
         );
         consumerGroup.setGroupEpoch(10);
         consumerGroup.setTargetAssignmentEpoch(10);
@@ -1536,8 +1533,7 @@
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
             new SnapshotRegistry(logContext),
-            groupId,
-            mock(GroupCoordinatorMetricsShard.class)
+            groupId
         );
         consumerGroup.setGroupEpoch(10);
         consumerGroup.setTargetAssignmentEpoch(10);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 202b91a..5956e51 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -45,7 +44,6 @@
 import org.apache.kafka.coordinator.group.classic.ClassicGroup;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
 import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.ModernGroup;
@@ -83,7 +81,6 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
 
 public class ConsumerGroupTest {
 
@@ -91,8 +88,7 @@
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         return new ConsumerGroup(
             snapshotRegistry,
-            groupId,
-            mock(GroupCoordinatorMetricsShard.class)
+            groupId
         );
     }
 
@@ -700,8 +696,7 @@
     @Test
     public void testUpdateInvertedAssignment() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group");
         Uuid topicId = Uuid.randomUuid();
         String memberId1 = "member1";
         String memberId2 = "member2";
@@ -916,12 +911,7 @@
     @Test
     public void testAsListedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
-            snapshotRegistry,
-            Map.of(),
-            new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
-        );
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
+        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
         group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -937,8 +927,7 @@
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConsumerGroup group = new ConsumerGroup(
             snapshotRegistry,
-            "group-foo",
-            mock(GroupCoordinatorMetricsShard.class)
+            "group-foo"
         );
 
         // Simulate a call from the admin client without member id and member epoch.
@@ -997,7 +986,7 @@
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
-        ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+        ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id");
 
         Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
         assertTrue(offsetExpirationCondition.isPresent());
@@ -1034,7 +1023,7 @@
     @Test
     public void testAsDescribedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
+        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
 
@@ -1071,12 +1060,7 @@
     @Test
     public void testIsInStatesCaseInsensitive() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
-            snapshotRegistry,
-            Map.of(),
-            new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
-        );
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
+        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(group.isInStates(Set.of("empty"), 0));
         assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -1307,7 +1291,6 @@
 
         ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
             new SnapshotRegistry(logContext),
-            mock(GroupCoordinatorMetricsShard.class),
             classicGroup,
             new HashMap<>(),
             metadataImage
@@ -1315,8 +1298,7 @@
 
         ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
             new SnapshotRegistry(logContext),
-            groupId,
-            mock(GroupCoordinatorMetricsShard.class)
+            groupId
         );
         expectedConsumerGroup.setGroupEpoch(10);
         expectedConsumerGroup.setTargetAssignmentEpoch(10);