KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig (#16458)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 8f187d9..58a37e3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1770,16 +1770,16 @@
@nowarn("cat=deprecation")
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
- config.offsetMetadataMaxSize,
- config.offsetsLoadBufferSize,
- config.offsetsRetentionMinutes * 60L * 1000L,
- config.offsetsRetentionCheckIntervalMs,
- config.offsetsTopicPartitions,
- config.offsetsTopicSegmentBytes,
- config.offsetsTopicReplicationFactor,
- config.offsetsTopicCompressionType,
- config.offsetCommitTimeoutMs,
- config.offsetCommitRequiredAcks
+ config.groupCoordinatorConfig.offsetMetadataMaxSize,
+ config.groupCoordinatorConfig.offsetsLoadBufferSize,
+ config.groupCoordinatorConfig.offsetsRetentionMs,
+ config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
+ config.groupCoordinatorConfig.offsetsTopicPartitions,
+ config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
+ config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
+ config.groupCoordinatorConfig.offsetTopicCompressionType,
+ config.groupCoordinatorConfig.offsetCommitTimeoutMs,
+ config.groupCoordinatorConfig.offsetCommitRequiredAcks
)
private[group] def apply(
@@ -1791,10 +1791,10 @@
metrics: Metrics
): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
- val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
- groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
- groupMaxSize = config.groupMaxSize,
- groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
+ val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMinSessionTimeoutMs,
+ groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs,
+ groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
+ groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)
val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
offsetConfig, replicaManager, time, metrics)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index a84e053..6d8126c 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -234,8 +234,8 @@
case GROUP_METADATA_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
- .setNumPartitions(config.offsetsTopicPartitions)
- .setReplicationFactor(config.offsetsTopicReplicationFactor)
+ .setNumPartitions(config.groupCoordinatorConfig.offsetsTopicPartitions)
+ .setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b962d49..074c9b1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,7 +36,7 @@
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
+import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.security.CredentialProvider
@@ -568,26 +568,6 @@
if (config.isNewGroupCoordinatorEnabled) {
val time = Time.SYSTEM
val serde = new CoordinatorRecordSerde
- val groupCoordinatorConfig = new GroupCoordinatorConfig(
- config.groupCoordinatorNumThreads,
- config.groupCoordinatorAppendLingerMs,
- config.consumerGroupSessionTimeoutMs,
- config.consumerGroupHeartbeatIntervalMs,
- config.consumerGroupMaxSize,
- config.consumerGroupAssignors,
- config.offsetsTopicSegmentBytes,
- config.offsetMetadataMaxSize,
- config.groupMaxSize,
- config.groupInitialRebalanceDelay,
- GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
- config.groupMinSessionTimeoutMs,
- config.groupMaxSessionTimeoutMs,
- config.offsetsRetentionCheckIntervalMs,
- config.offsetsRetentionMinutes * 60 * 1000L,
- config.offsetCommitTimeoutMs,
- config.consumerGroupMigrationPolicy,
- config.offsetsTopicCompressionType
- )
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
new SystemTimer("group-coordinator")
@@ -596,12 +576,12 @@
time,
replicaManager,
serde,
- config.offsetsLoadBufferSize
+ config.groupCoordinatorConfig.offsetsLoadBufferSize
)
val writer = new CoordinatorPartitionWriter(
replicaManager
)
- new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
+ new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
.withTime(time)
.withTimer(timer)
.withLoader(loader)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f52271f..f6a7b6f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -530,7 +530,7 @@
authorizedTopicsRequest.foreach { topic =>
topic.partitions.forEach { partition =>
val error = try {
- if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) {
+ if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
Errors.OFFSET_METADATA_TOO_LARGE
} else {
zkSupport.zkClient.setOrCreateConsumerOffset(
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f230140..1f37b0b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,10 +33,8 @@
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
@@ -234,6 +232,9 @@
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
+ private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
+ def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig
+
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
@@ -439,8 +440,6 @@
val logCleanupIntervalMs = getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)
- val offsetsRetentionMinutes = getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG)
- val offsetsRetentionCheckIntervalMs = getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG)
def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
@@ -562,12 +561,6 @@
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
- /** ********* Group coordinator configuration ***********/
- val groupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
- val groupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
- val groupInitialRebalanceDelay = getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)
- val groupMaxSize = getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG)
-
/** New group coordinator configs */
val groupCoordinatorRebalanceProtocols = {
val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
@@ -588,19 +581,6 @@
// it is explicitly set; or 2) the consumer rebalance protocol is enabled.
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
- val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
- val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)
-
- /** Consumer group configs */
- val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
- val consumerGroupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
- val consumerGroupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
- val consumerGroupHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
- val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
- val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
- val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
- val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
- val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))
/** Share group configuration **/
val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
@@ -618,17 +598,6 @@
val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)
- /** ********* Offset management configuration ***********/
- val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG)
- val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG)
- val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)
- val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)
- val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG)
- @deprecated("3.8")
- val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)
- val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG)
- val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull
-
/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
@@ -923,7 +892,7 @@
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
- require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
+ require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
@@ -1076,7 +1045,7 @@
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher")
- if (offsetsTopicCompressionType == CompressionType.ZSTD)
+ if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
s"is set to version ${IBP_2_1_IV0.shortVersion} or higher")
@@ -1109,23 +1078,23 @@
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
// New group coordinator configs validation.
- require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
+ require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
- require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
+ require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
- require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
+ require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
- require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
+ require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
- require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
+ require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
- require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
+ require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 477774f..a67d8c9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -501,7 +501,7 @@
Time.SYSTEM,
metrics
)
- groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
+ groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
/* create producer ids manager */
val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 04a063f..680b883 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -301,7 +301,7 @@
try {
// Start the group coordinator.
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
- .getOrElse(config.offsetsTopicPartitions))
+ .getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index df39d23..a9d7fb1 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -163,7 +163,7 @@
TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
- numPartitions = servers.head.config.offsetsTopicPartitions,
+ numPartitions = servers.head.config.groupCoordinatorConfig.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 924b839..6bdf6ad 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -84,7 +84,7 @@
metrics = new Metrics
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
- groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+ groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)
// Transactional appends attempt to schedule to the request handler thread using
@@ -155,7 +155,7 @@
groupCoordinator.shutdown()
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory,
rebalancePurgatory, timer.time, new Metrics())
- groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+ groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)
val members = new Group(s"group", nMembersPerGroup, groupCoordinator, replicaManager)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 58c6523..de4b7e3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -121,7 +121,7 @@
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false)
groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics())
- groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+ groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
enableMetadataExpiration = false)
// add the partition into the owned partition list
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index f184b62..97c7c2e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -83,16 +83,16 @@
@nowarn("cat=deprecation")
private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
- new OffsetConfig(config.offsetMetadataMaxSize,
- config.offsetsLoadBufferSize,
- config.offsetsRetentionMinutes * 60 * 1000L,
- config.offsetsRetentionCheckIntervalMs,
- config.offsetsTopicPartitions,
- config.offsetsTopicSegmentBytes,
- config.offsetsTopicReplicationFactor,
- config.offsetsTopicCompressionType,
- config.offsetCommitTimeoutMs,
- config.offsetCommitRequiredAcks)
+ new OffsetConfig(config.groupCoordinatorConfig.offsetMetadataMaxSize,
+ config.groupCoordinatorConfig.offsetsLoadBufferSize,
+ config.groupCoordinatorConfig.offsetsRetentionMs,
+ config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
+ config.groupCoordinatorConfig.offsetsTopicPartitions,
+ config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
+ config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
+ config.groupCoordinatorConfig.offsetTopicCompressionType,
+ config.groupCoordinatorConfig.offsetCommitTimeoutMs,
+ config.groupCoordinatorConfig.offsetCommitRequiredAcks)
}
@BeforeEach
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index d07bcc1..6b4badb 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -449,8 +449,8 @@
*/
private def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
val server = servers.head
- val numPartitions = server.config.offsetsTopicPartitions
- val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+ val numPartitions = server.config.groupCoordinatorConfig.offsetsTopicPartitions
+ val replicationFactor = server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt
try {
TestUtils.createTopic(
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index d2bb8ea..9732b0c 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -356,7 +356,7 @@
val newTopic = if (isInternal) {
topicName match {
case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
- numPartitions = config.offsetsTopicPartitions, replicationFactor = config.offsetsTopicReplicationFactor)
+ numPartitions = config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
numPartitions = config.transactionTopicPartitions, replicationFactor = config.transactionTopicReplicationFactor)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d0f0b6e..c63fa40 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1280,7 +1280,7 @@
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
assertEquals(123L, config.logFlushIntervalMs)
- assertEquals(CompressionType.SNAPPY, config.offsetsTopicCompressionType)
+ assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
assertEquals(false, config.tokenAuthEnabled)
assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
@@ -1969,14 +1969,14 @@
ConsumerGroupMigrationPolicy.values.foreach { policy =>
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString)
val config = KafkaConfig.fromProps(props)
- assertEquals(policy, config.consumerGroupMigrationPolicy)
+ assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy)
}
// The config is case-insensitive.
ConsumerGroupMigrationPolicy.values.foreach { policy =>
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString.toUpperCase())
val config = KafkaConfig.fromProps(props)
- assertEquals(policy, config.consumerGroupMigrationPolicy)
+ assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy)
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 9e2bbb4..4740b4e 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
@@ -26,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -211,77 +213,111 @@
*/
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;
+ private final AbstractConfig config;
+
+ public GroupCoordinatorConfig(AbstractConfig config) {
+ this.config = config;
+ }
+
/**
* The number of threads or event loops running.
*/
- public final int numThreads;
+ public int numThreads() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+ }
/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
- public final int appendLingerMs;
+ public int appendLingerMs() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
+ }
/**
* The consumer group session timeout in milliseconds.
*/
- public final int consumerGroupSessionTimeoutMs;
+ public int consumerGroupSessionTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+ }
/**
* The consumer group heartbeat interval in milliseconds.
*/
- public final int consumerGroupHeartbeatIntervalMs;
+ public int consumerGroupHeartbeatIntervalMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
/**
* The consumer group maximum size.
*/
- public final int consumerGroupMaxSize;
+ public int consumerGroupMaxSize() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
+ }
/**
* The consumer group assignors.
*/
- public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
+ public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() {
+ return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class);
+ }
/**
* The offsets topic segment bytes should be kept relatively small to facilitate faster
* log compaction and faster offset loads.
*/
- public final int offsetsTopicSegmentBytes;
+ public int offsetsTopicSegmentBytes() {
+ return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
+ }
/**
* The maximum size for a metadata entry associated with an offset commit.
*/
- public final int offsetMetadataMaxSize;
+ public int offsetMetadataMaxSize() {
+ return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
+ }
/**
* The classic group maximum size.
*/
- public final int classicGroupMaxSize;
+ public int classicGroupMaxSize() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
+ }
/**
* The delay in milliseconds introduced for the first rebalance of a classic group.
*/
- public final int classicGroupInitialRebalanceDelayMs;
+ public int classicGroupInitialRebalanceDelayMs() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ }
/**
* The timeout used to wait for a new member in milliseconds.
*/
- public final int classicGroupNewMemberJoinTimeoutMs;
+ public int classicGroupNewMemberJoinTimeoutMs() {
+ return CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS;
+ }
/**
* The classic group minimum session timeout.
*/
- public final int classicGroupMinSessionTimeoutMs;
+ public int classicGroupMinSessionTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+ }
/**
* The classic group maximum session timeout.
*/
- public final int classicGroupMaxSessionTimeoutMs;
+ public int classicGroupMaxSessionTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+ }
/**
* Frequency at which to check for expired offsets.
*/
- public final long offsetsRetentionCheckIntervalMs;
+ public long offsetsRetentionCheckIntervalMs() {
+ return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
+ }
/**
* For subscribed consumers, committed offset of a specific partition will be expired and discarded when:
@@ -297,61 +333,92 @@
* Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's
* committed offsets for that topic will also be deleted without extra retention period.
*/
- public final long offsetsRetentionMs;
+ public long offsetsRetentionMs() {
+ return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
+ }
/**
* Offset commit will be delayed until all replicas for the offsets topic receive the commit
* or this timeout is reached
*/
- public final int offsetCommitTimeoutMs;
+ public int offsetCommitTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+ }
/**
* The config indicating whether group protocol upgrade/downgrade are allowed.
*/
- public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
+ public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
+ return ConsumerGroupMigrationPolicy.parse(
+ config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
+ }
/**
* The compression type used to compress records in batches.
*/
- public final CompressionType compressionType;
+ public CompressionType offsetTopicCompressionType() {
+ return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
+ .map(CompressionType::forId)
+ .orElse(null);
+ }
- public GroupCoordinatorConfig(
- int numThreads,
- int appendLingerMs,
- int consumerGroupSessionTimeoutMs,
- int consumerGroupHeartbeatIntervalMs,
- int consumerGroupMaxSize,
- List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
- int offsetsTopicSegmentBytes,
- int offsetMetadataMaxSize,
- int classicGroupMaxSize,
- int classicGroupInitialRebalanceDelayMs,
- int classicGroupNewMemberJoinTimeoutMs,
- int classicGroupMinSessionTimeoutMs,
- int classicGroupMaxSessionTimeoutMs,
- long offsetsRetentionCheckIntervalMs,
- long offsetsRetentionMs,
- int offsetCommitTimeoutMs,
- ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy,
- CompressionType compressionType
- ) {
- this.numThreads = numThreads;
- this.appendLingerMs = appendLingerMs;
- this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
- this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
- this.consumerGroupMaxSize = consumerGroupMaxSize;
- this.consumerGroupAssignors = consumerGroupAssignors;
- this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
- this.offsetMetadataMaxSize = offsetMetadataMaxSize;
- this.classicGroupMaxSize = classicGroupMaxSize;
- this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
- this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
- this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs;
- this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs;
- this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
- this.offsetsRetentionMs = offsetsRetentionMs;
- this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
- this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
- this.compressionType = compressionType;
+ /**
+ * Batch size for reading from the offsets segments when loading offsets into
+ * the cache (soft-limit, overridden if records are too large).
+ */
+ public int offsetsLoadBufferSize() {
+ return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
+ }
+
+ /**
+ * The number of partitions for the offset commit topic (should not change after deployment).
+ */
+ public int offsetsTopicPartitions() {
+ return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
+ }
+
+ /**
+ * The replication factor for the offsets topic (set higher to ensure availability).
+ * Internal topic creation will fail until the cluster size meets this replication factor requirement.
+ */
+ public short offsetsTopicReplicationFactor() {
+ return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
+ }
+
+ /**
+ * DEPRECATED: The required acks before the commit can be accepted.
+ * In general, the default (-1) should not be overridden.
+ */
+ @Deprecated // since 3.8
+ public short offsetCommitRequiredAcks() {
+ return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
+ }
+
+ /**
+ * The minimum allowed session timeout for registered consumers.
+ */
+ public int consumerGroupMinSessionTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+ }
+
+ /**
+ * The maximum allowed session timeout for registered consumers.
+ */
+ public int consumerGroupMaxSessionTimeoutMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+ }
+
+ /**
+ * The minimum heartbeat interval for registered consumers.
+ */
+ public int consumerGroupMinHeartbeatIntervalMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
+
+ /**
+ * The maximum heartbeat interval for registered consumers.
+ */
+ public int consumerGroupMaxHeartbeatIntervalMs() {
+ return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e0528f5..8d4845b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -168,7 +168,7 @@
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,
"group-coordinator-event-processor-",
- config.numThreads,
+ config.numThreads(),
time,
coordinatorRuntimeMetrics
);
@@ -183,12 +183,12 @@
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
- .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
+ .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs()))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
- .withCompression(Compression.of(config.compressionType).build())
- .withAppendLingerMs(config.appendLingerMs)
+ .withCompression(Compression.of(config.offsetTopicCompressionType()).build())
+ .withAppendLingerMs(config.appendLingerMs())
.build();
return new GroupCoordinatorService(
@@ -296,7 +296,7 @@
return runtime.scheduleWriteOperation(
"consumer-group-heartbeat",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.consumerGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"consumer-group-heartbeat",
@@ -331,8 +331,8 @@
);
}
- if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs ||
- request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) {
+ if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs() ||
+ request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs()) {
return CompletableFuture.completedFuture(new JoinGroupResponseData()
.setMemberId(request.memberId())
.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
@@ -344,7 +344,7 @@
runtime.scheduleWriteOperation(
"classic-group-join",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupJoin(context, request, responseFuture)
).exceptionally(exception -> {
if (!responseFuture.isDone()) {
@@ -387,7 +387,7 @@
runtime.scheduleWriteOperation(
"classic-group-sync",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
if (!responseFuture.isDone()) {
@@ -427,7 +427,7 @@
return runtime.scheduleWriteOperation(
"classic-group-heartbeat",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"classic-group-heartbeat",
@@ -469,7 +469,7 @@
return runtime.scheduleWriteOperation(
"classic-group-leave",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.classicGroupLeave(context, request)
).exceptionally(exception -> handleOperationException(
"classic-group-leave",
@@ -682,7 +682,7 @@
runtime.scheduleWriteOperation(
"delete-groups",
topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.deleteGroups(context, groupList)
).exceptionally(exception -> handleOperationException(
"delete-groups",
@@ -736,7 +736,7 @@
return runtime.scheduleWriteOperation(
"fetch-offsets",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchOffsets(request, Long.MAX_VALUE)
@@ -787,7 +787,7 @@
return runtime.scheduleWriteOperation(
"fetch-all-offsets",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
@@ -829,7 +829,7 @@
return runtime.scheduleWriteOperation(
"commit-offset",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.commitOffset(context, request)
).exceptionally(exception -> handleOperationException(
"commit-offset",
@@ -868,7 +868,7 @@
request.transactionalId(),
request.producerId(),
request.producerEpoch(),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.commitTransactionalOffset(context, request),
context.apiVersion()
).exceptionally(exception -> handleOperationException(
@@ -903,7 +903,7 @@
return runtime.scheduleWriteOperation(
"delete-offsets",
topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.deleteOffsets(context, request)
).exceptionally(exception -> handleOperationException(
"delete-offsets",
@@ -973,7 +973,7 @@
FutureUtils.mapExceptionally(
runtime.scheduleWriteAllOperation(
"on-partition-deleted",
- Duration.ofMillis(config.offsetCommitTimeoutMs),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.onPartitionsDeleted(topicPartitions)
),
exception -> {
@@ -1036,7 +1036,7 @@
Properties properties = new Properties();
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
- properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes));
+ properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes()));
return properties;
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 329feca..56ea193 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -181,16 +181,16 @@
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
- .withConsumerGroupAssignors(config.consumerGroupAssignors)
- .withConsumerGroupMaxSize(config.consumerGroupMaxSize)
- .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs)
- .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
- .withClassicGroupMaxSize(config.classicGroupMaxSize)
- .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs)
- .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs)
- .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs)
- .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs)
- .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy)
+ .withConsumerGroupAssignors(config.consumerGroupAssignors())
+ .withConsumerGroupMaxSize(config.consumerGroupMaxSize())
+ .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs())
+ .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs())
+ .withClassicGroupMaxSize(config.classicGroupMaxSize())
+ .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs())
+ .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs())
+ .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs())
+ .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs())
+ .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy())
.withGroupCoordinatorMetricsShard(metricsShard)
.build();
@@ -595,10 +595,10 @@
private void scheduleGroupMetadataExpiration() {
timer.schedule(
GROUP_EXPIRATION_KEY,
- config.offsetsRetentionCheckIntervalMs,
+ config.offsetsRetentionCheckIntervalMs(),
TimeUnit.MILLISECONDS,
true,
- config.offsetsRetentionCheckIntervalMs,
+ config.offsetsRetentionCheckIntervalMs(),
this::cleanupGroupMetadata
);
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 600ed16..dad69db 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -412,7 +412,7 @@
* @return True if the committed metadata is invalid; False otherwise.
*/
private boolean isMetadataInvalid(String metadata) {
- return metadata != null && metadata.length() > config.offsetMetadataMaxSize;
+ return metadata != null && metadata.length() > config.offsetMetadataMaxSize();
}
/**
@@ -881,7 +881,7 @@
if (!group.isSubscribedToTopic(topic)) {
partitions.forEach((partition, offsetAndMetadata) -> {
// We don't expire the offset yet if there is a pending transactional offset for the partition.
- if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs) &&
+ if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) &&
!hasPendingTransactionalOffsets(groupId, topic, partition)) {
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString());
log.debug("[GroupId {}] Expired offset for partition={}-{}", groupId, topic, partition);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 8a9947e..ba3b357 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -16,84 +16,116 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@SuppressWarnings("deprecation")
public class GroupCoordinatorConfigTest {
+ private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS = Arrays.asList(
+ GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
+ GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
+ GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
+ GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF);
+
@Test
public void testConfigs() {
- ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
- GroupCoordinatorConfig config = new GroupCoordinatorConfig(
- 10,
- 10,
- 30,
- 10,
- 55,
- Collections.singletonList(assignor),
- 2222,
- 3333,
- 60,
- 3000,
- 5 * 60 * 1000,
- 120,
- 10 * 60 * 1000,
- 600000L,
- 24 * 60 * 60 * 1000L,
- 5000,
- ConsumerGroupMigrationPolicy.DISABLED,
- CompressionType.GZIP
- );
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10);
+ configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222);
+ configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, 3333);
+ configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, 60);
+ configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000);
+ configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120);
+ configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 60 * 1000);
+ configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, 600000);
+ configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, 24 * 60 * 60 * 1000);
+ configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.GZIP.id);
+ configs.put(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, 555);
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 111);
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 11);
+ configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) 0);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 333);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222);
- assertEquals(10, config.numThreads);
- assertEquals(30, config.consumerGroupSessionTimeoutMs);
- assertEquals(10, config.consumerGroupHeartbeatIntervalMs);
- assertEquals(55, config.consumerGroupMaxSize);
- assertEquals(Collections.singletonList(assignor), config.consumerGroupAssignors);
- assertEquals(2222, config.offsetsTopicSegmentBytes);
- assertEquals(3333, config.offsetMetadataMaxSize);
- assertEquals(60, config.classicGroupMaxSize);
- assertEquals(3000, config.classicGroupInitialRebalanceDelayMs);
- assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs);
- assertEquals(120, config.classicGroupMinSessionTimeoutMs);
- assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs);
- assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs);
- assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
- assertEquals(5000, config.offsetCommitTimeoutMs);
- assertEquals(CompressionType.GZIP, config.compressionType);
- assertEquals(10, config.appendLingerMs);
+ GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+ new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
+
+ assertEquals(10, config.numThreads());
+ assertEquals(30, config.consumerGroupSessionTimeoutMs());
+ assertEquals(10, config.consumerGroupHeartbeatIntervalMs());
+ assertEquals(55, config.consumerGroupMaxSize());
+ assertEquals(1, config.consumerGroupAssignors().size());
+ assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name());
+ assertEquals(2222, config.offsetsTopicSegmentBytes());
+ assertEquals(3333, config.offsetMetadataMaxSize());
+ assertEquals(60, config.classicGroupMaxSize());
+ assertEquals(3000, config.classicGroupInitialRebalanceDelayMs());
+ assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs());
+ assertEquals(120, config.classicGroupMinSessionTimeoutMs());
+ assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs());
+ assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs());
+ assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), config.offsetsRetentionMs());
+ assertEquals(5000, config.offsetCommitTimeoutMs());
+ assertEquals(CompressionType.GZIP, config.offsetTopicCompressionType());
+ assertEquals(10, config.appendLingerMs());
+ assertEquals(555, config.offsetsLoadBufferSize());
+ assertEquals(111, config.offsetsTopicPartitions());
+ assertEquals(11, config.offsetsTopicReplicationFactor());
+ assertEquals(0, config.offsetCommitRequiredAcks());
+ assertEquals(333, config.consumerGroupMinSessionTimeoutMs());
+ assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
+ assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
+ assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
int offsetMetadataMaxSize,
long offsetsRetentionCheckIntervalMs,
- long offsetsRetentionMs
+ int offsetsRetentionMinutes
) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
+ configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000);
+ configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, offsetMetadataMaxSize);
+ configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE);
+ configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000);
+ configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120);
+ configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 5 * 1000);
+ configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, offsetsRetentionCheckIntervalMs);
+ configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, offsetsRetentionMinutes);
+ configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000);
+ configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
+ configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id);
+
return new GroupCoordinatorConfig(
- 1,
- 10,
- 45,
- 5,
- Integer.MAX_VALUE,
- Collections.singletonList(new RangeAssignor()),
- 1000,
- offsetMetadataMaxSize,
- Integer.MAX_VALUE,
- 3000,
- 5 * 60 * 1000,
- 120,
- 10 * 5 * 1000,
- offsetsRetentionCheckIntervalMs,
- offsetsRetentionMs,
- 5000,
- ConsumerGroupMigrationPolicy.DISABLED,
- CompressionType.NONE
- );
+ new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
}
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 7ad0ecd..24da10e 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -57,7 +57,6 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
@@ -66,7 +65,6 @@
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -117,26 +115,7 @@
}
private GroupCoordinatorConfig createConfig() {
- return new GroupCoordinatorConfig(
- 1,
- 10,
- 45,
- 5,
- Integer.MAX_VALUE,
- Collections.singletonList(new RangeAssignor()),
- 1000,
- 4096,
- Integer.MAX_VALUE,
- 3000,
- 5 * 60 * 1000,
- 120,
- 10 * 5 * 1000,
- 600000L,
- 24 * 60 * 1000L,
- 5000,
- ConsumerGroupMigrationPolicy.DISABLED,
- CompressionType.NONE
- );
+ return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 600000L, 24);
}
@Test
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 61fcca2..bd997d7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -68,6 +68,7 @@
import org.junit.jupiter.params.provider.EnumSource;
import java.net.InetAddress;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -105,12 +106,12 @@
private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
- config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000);
+ config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60);
return this;
}
- Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
- config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs);
+ Builder withOffsetsRetentionMinutes(int offsetsRetentionMinutes) {
+ config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMinutes);
return this;
}
@@ -122,7 +123,7 @@
OffsetMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (config == null) {
- config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 1000);
+ config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24);
}
if (groupMetadataManager == null) {
@@ -2515,7 +2516,7 @@
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
- .withOffsetsRetentionMs(1000)
+ .withOffsetsRetentionMinutes(1)
.build();
long commitTimestamp = context.time.milliseconds();
@@ -2524,7 +2525,7 @@
context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500);
- context.time.sleep(1000);
+ context.time.sleep(Duration.ofMinutes(1).toMillis());
// firstTopic-0: group is still subscribed to firstTopic. Do not expire.
// secondTopic-0: should expire as offset retention has passed.
@@ -2576,7 +2577,7 @@
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
- .withOffsetsRetentionMs(1000)
+ .withOffsetsRetentionMinutes(1)
.build();
long commitTimestamp = context.time.milliseconds();
@@ -2584,7 +2585,7 @@
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, commitTimestamp + 500);
- context.time.sleep(1000);
+ context.time.sleep(Duration.ofMinutes(1).toMillis());
when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
@@ -3001,7 +3002,7 @@
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
- .withOffsetsRetentionMs(1000)
+ .withOffsetsRetentionMinutes(1)
.build();
long commitTimestamp = context.time.milliseconds();
@@ -3010,7 +3011,7 @@
context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500);
- context.time.sleep(1000);
+ context.time.sleep(Duration.ofMinutes(1).toMillis());
// firstTopic-0: group is still subscribed to firstTopic. Do not expire.
// secondTopic-0: should expire as offset retention has passed.