MINOR: Remove metrics attribute from ConsumerGroup (#20542)
The `metrics` attribute in `ConsumerGroup` is not used anymore. This
patch removes it.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Dongnuo Lyu
<dlyu@confluent.io>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f87af48..477e049 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -813,10 +813,10 @@
}
if (group == null) {
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(snapshotRegistry, groupId);
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) {
log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId);
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(snapshotRegistry, groupId);
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@@ -980,7 +980,7 @@
}
if (group == null) {
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else if (group.type() == CONSUMER) {
@@ -990,7 +990,7 @@
// offsets if no group existed. Simple classic groups are not backed by any records
// in the __consumer_offsets topic hence we can safely replace it here. Without this,
// replaying consumer group records after offset commit records would not work.
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else {
@@ -1364,7 +1364,6 @@
try {
consumerGroup = ConsumerGroup.fromClassicGroup(
snapshotRegistry,
- metrics,
classicGroup,
topicHashCache,
metadataImage
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 19b103c..2db94cb 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -38,7 +38,6 @@
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
@@ -120,11 +119,6 @@
private final TimelineHashMap<String, Integer> serverAssignors;
/**
- * The coordinator metrics.
- */
- private final GroupCoordinatorMetricsShard metrics;
-
- /**
* The number of members that use the classic protocol.
*/
private final TimelineInteger numClassicProtocolMembers;
@@ -155,14 +149,12 @@
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
- String groupId,
- GroupCoordinatorMetricsShard metrics
+ String groupId
) {
super(snapshotRegistry, groupId);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
- this.metrics = Objects.requireNonNull(metrics);
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1130,7 +1122,6 @@
* Create a new consumer group according to the given classic group.
*
* @param snapshotRegistry The SnapshotRegistry.
- * @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
* @param topicHashCache The cache for topic hashes.
* @param metadataImage The current metadata image for the Kafka cluster.
@@ -1141,13 +1132,12 @@
*/
public static ConsumerGroup fromClassicGroup(
SnapshotRegistry snapshotRegistry,
- GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
Map<String, Long> topicHashCache,
CoordinatorMetadataImage metadataImage
) {
String groupId = classicGroup.groupId();
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
consumerGroup.setGroupEpoch(classicGroup.generationId());
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9a7d382..145b761 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -84,7 +84,6 @@
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
@@ -1384,10 +1383,9 @@
ArgumentCaptor<List<CoordinatorRecord>> recordsCapture = ArgumentCaptor.forClass(List.class);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id", metricsShard);
- ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id", metricsShard);
+ ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id");
+ ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id");
when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id", "other-group-id"));
when(groupMetadataManager.group("group-id")).thenReturn(group1);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 2e6cf00..24212d1 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -45,7 +45,6 @@
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
@@ -81,7 +80,6 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
public class ClassicGroupTest {
private final String protocolType = "consumer";
@@ -1383,8 +1381,7 @@
ConsumerGroup consumerGroup = new ConsumerGroup(
new SnapshotRegistry(logContext),
- groupId,
- mock(GroupCoordinatorMetricsShard.class)
+ groupId
);
consumerGroup.setGroupEpoch(10);
consumerGroup.setTargetAssignmentEpoch(10);
@@ -1536,8 +1533,7 @@
ConsumerGroup consumerGroup = new ConsumerGroup(
new SnapshotRegistry(logContext),
- groupId,
- mock(GroupCoordinatorMetricsShard.class)
+ groupId
);
consumerGroup.setGroupEpoch(10);
consumerGroup.setTargetAssignmentEpoch(10);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 202b91a..5956e51 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -25,7 +25,6 @@
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -45,7 +44,6 @@
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
@@ -83,7 +81,6 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
public class ConsumerGroupTest {
@@ -91,8 +88,7 @@
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
return new ConsumerGroup(
snapshotRegistry,
- groupId,
- mock(GroupCoordinatorMetricsShard.class)
+ groupId
);
}
@@ -700,8 +696,7 @@
@Test
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group");
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
@@ -916,12 +911,7 @@
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
- snapshotRegistry,
- Map.of(),
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
- );
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
+ ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -937,8 +927,7 @@
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConsumerGroup group = new ConsumerGroup(
snapshotRegistry,
- "group-foo",
- mock(GroupCoordinatorMetricsShard.class)
+ "group-foo"
);
// Simulate a call from the admin client without member id and member epoch.
@@ -997,7 +986,7 @@
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
- ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id");
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());
@@ -1034,7 +1023,7 @@
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1");
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
@@ -1071,12 +1060,7 @@
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
- snapshotRegistry,
- Map.of(),
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
- );
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
+ ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Set.of("empty"), 0));
assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -1307,7 +1291,6 @@
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
new SnapshotRegistry(logContext),
- mock(GroupCoordinatorMetricsShard.class),
classicGroup,
new HashMap<>(),
metadataImage
@@ -1315,8 +1298,7 @@
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
new SnapshotRegistry(logContext),
- groupId,
- mock(GroupCoordinatorMetricsShard.class)
+ groupId
);
expectedConsumerGroup.setGroupEpoch(10);
expectedConsumerGroup.setTargetAssignmentEpoch(10);