KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig (#16458)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 8f187d9..58a37e3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1770,16 +1770,16 @@
 
   @nowarn("cat=deprecation")
   private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
-    config.offsetMetadataMaxSize,
-    config.offsetsLoadBufferSize,
-    config.offsetsRetentionMinutes * 60L * 1000L,
-    config.offsetsRetentionCheckIntervalMs,
-    config.offsetsTopicPartitions,
-    config.offsetsTopicSegmentBytes,
-    config.offsetsTopicReplicationFactor,
-    config.offsetsTopicCompressionType,
-    config.offsetCommitTimeoutMs,
-    config.offsetCommitRequiredAcks
+    config.groupCoordinatorConfig.offsetMetadataMaxSize,
+    config.groupCoordinatorConfig.offsetsLoadBufferSize,
+    config.groupCoordinatorConfig.offsetsRetentionMs,
+    config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
+    config.groupCoordinatorConfig.offsetsTopicPartitions,
+    config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
+    config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
+    config.groupCoordinatorConfig.offsetTopicCompressionType,
+    config.groupCoordinatorConfig.offsetCommitTimeoutMs,
+    config.groupCoordinatorConfig.offsetCommitRequiredAcks
   )
 
   private[group] def apply(
@@ -1791,10 +1791,10 @@
     metrics: Metrics
   ): GroupCoordinator = {
     val offsetConfig = this.offsetConfig(config)
-    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
-      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
-      groupMaxSize = config.groupMaxSize,
-      groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
+    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMinSessionTimeoutMs,
+      groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs,
+      groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
+      groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)
 
     val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
       offsetConfig, replicaManager, time, metrics)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index a84e053..6d8126c 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -234,8 +234,8 @@
       case GROUP_METADATA_TOPIC_NAME =>
         new CreatableTopic()
           .setName(topic)
-          .setNumPartitions(config.offsetsTopicPartitions)
-          .setReplicationFactor(config.offsetsTopicReplicationFactor)
+          .setNumPartitions(config.groupCoordinatorConfig.offsetsTopicPartitions)
+          .setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
           .setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
       case TRANSACTION_STATE_TOPIC_NAME =>
         new CreatableTopic()
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b962d49..074c9b1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,7 +36,7 @@
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
+import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
 import org.apache.kafka.security.CredentialProvider
@@ -568,26 +568,6 @@
     if (config.isNewGroupCoordinatorEnabled) {
       val time = Time.SYSTEM
       val serde = new CoordinatorRecordSerde
-      val groupCoordinatorConfig = new GroupCoordinatorConfig(
-        config.groupCoordinatorNumThreads,
-        config.groupCoordinatorAppendLingerMs,
-        config.consumerGroupSessionTimeoutMs,
-        config.consumerGroupHeartbeatIntervalMs,
-        config.consumerGroupMaxSize,
-        config.consumerGroupAssignors,
-        config.offsetsTopicSegmentBytes,
-        config.offsetMetadataMaxSize,
-        config.groupMaxSize,
-        config.groupInitialRebalanceDelay,
-        GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
-        config.groupMinSessionTimeoutMs,
-        config.groupMaxSessionTimeoutMs,
-        config.offsetsRetentionCheckIntervalMs,
-        config.offsetsRetentionMinutes * 60 * 1000L,
-        config.offsetCommitTimeoutMs,
-        config.consumerGroupMigrationPolicy,
-        config.offsetsTopicCompressionType
-      )
       val timer = new SystemTimerReaper(
         "group-coordinator-reaper",
         new SystemTimer("group-coordinator")
@@ -596,12 +576,12 @@
         time,
         replicaManager,
         serde,
-        config.offsetsLoadBufferSize
+        config.groupCoordinatorConfig.offsetsLoadBufferSize
       )
       val writer = new CoordinatorPartitionWriter(
         replicaManager
       )
-      new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
+      new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
         .withTime(time)
         .withTimer(timer)
         .withLoader(loader)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f52271f..f6a7b6f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -530,7 +530,7 @@
     authorizedTopicsRequest.foreach { topic =>
       topic.partitions.forEach { partition =>
         val error = try {
-          if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) {
+          if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
             Errors.OFFSET_METADATA_TOO_LARGE
           } else {
             zkSupport.zkClient.setOrCreateConsumerOffset(
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f230140..1f37b0b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,10 +33,8 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
@@ -234,6 +232,9 @@
   private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
   def remoteLogManagerConfig = _remoteLogManagerConfig
 
+  private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)
+  def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig
+
   private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
     // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
     // Need to translate any system property value from true/false (String) to true/false (Boolean)
@@ -439,8 +440,6 @@
   val logCleanupIntervalMs = getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
   def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)
 
-  val offsetsRetentionMinutes = getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG)
-  val offsetsRetentionCheckIntervalMs = getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG)
   def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
   val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
   val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
@@ -562,12 +561,6 @@
   /** ********* Feature configuration ***********/
   def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
 
-  /** ********* Group coordinator configuration ***********/
-  val groupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
-  val groupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
-  val groupInitialRebalanceDelay = getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)
-  val groupMaxSize = getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG)
-
   /** New group coordinator configs */
   val groupCoordinatorRebalanceProtocols = {
     val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
@@ -588,19 +581,6 @@
   // it is explicitly set; or 2) the consumer rebalance protocol is enabled.
   val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
     groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
-  val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
-  val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)
-
-  /** Consumer group configs */
-  val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
-  val consumerGroupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
-  val consumerGroupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
-  val consumerGroupHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
-  val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
-  val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
-  val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
-  val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
-  val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))
 
   /** Share group configuration **/
   val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
@@ -618,17 +598,6 @@
   val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
   val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)
 
-  /** ********* Offset management configuration ***********/
-  val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG)
-  val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG)
-  val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)
-  val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)
-  val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG)
-  @deprecated("3.8")
-  val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)
-  val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG)
-  val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull
-
   /** ********* Transaction management configuration ***********/
   val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
   val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
@@ -923,7 +892,7 @@
       " to prevent unnecessary socket timeouts")
     require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
       " to prevent frequent changes in ISR")
-    require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
+    require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
 
@@ -1076,7 +1045,7 @@
       s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
       s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher")
 
-    if (offsetsTopicCompressionType == CompressionType.ZSTD)
+    if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
       require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
         "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
         s"is set to version ${IBP_2_1_IV0.shortVersion} or higher")
@@ -1109,23 +1078,23 @@
       s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
 
     // New group coordinator configs validation.
-    require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
+    require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
-    require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
+    require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
-    require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
+    require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
 
-    require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
+    require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
-    require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
+    require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
-    require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
+    require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
       s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
       s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 477774f..a67d8c9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -501,7 +501,7 @@
           Time.SYSTEM,
           metrics
         )
-        groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
+        groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
 
         /* create producer ids manager */
         val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 04a063f..680b883 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -301,7 +301,7 @@
     try {
       // Start the group coordinator.
       groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
-        .getOrElse(config.offsetsTopicPartitions))
+        .getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
     }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index df39d23..a9d7fb1 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -163,7 +163,7 @@
 
     TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers)
     TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
-      numPartitions = servers.head.config.offsetsTopicPartitions,
+      numPartitions = servers.head.config.groupCoordinatorConfig.offsetsTopicPartitions,
       replicationFactor = numServers,
       topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 924b839..6bdf6ad 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -84,7 +84,7 @@
 
     metrics = new Metrics
     groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
-    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
       enableMetadataExpiration = false)
 
     // Transactional appends attempt to schedule to the request handler thread using
@@ -155,7 +155,7 @@
       groupCoordinator.shutdown()
     groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory,
       rebalancePurgatory, timer.time, new Metrics())
-    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
       enableMetadataExpiration = false)
 
     val members = new Group(s"group", nMembersPerGroup, groupCoordinator, replicaManager)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 58c6523..de4b7e3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -121,7 +121,7 @@
     val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false)
 
     groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics())
-    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
+    groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions),
       enableMetadataExpiration = false)
 
     // add the partition into the owned partition list
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index f184b62..97c7c2e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -83,16 +83,16 @@
   @nowarn("cat=deprecation")
   private val offsetConfig = {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
-    new OffsetConfig(config.offsetMetadataMaxSize,
-      config.offsetsLoadBufferSize,
-      config.offsetsRetentionMinutes * 60 * 1000L,
-      config.offsetsRetentionCheckIntervalMs,
-      config.offsetsTopicPartitions,
-      config.offsetsTopicSegmentBytes,
-      config.offsetsTopicReplicationFactor,
-      config.offsetsTopicCompressionType,
-      config.offsetCommitTimeoutMs,
-      config.offsetCommitRequiredAcks)
+    new OffsetConfig(config.groupCoordinatorConfig.offsetMetadataMaxSize,
+      config.groupCoordinatorConfig.offsetsLoadBufferSize,
+      config.groupCoordinatorConfig.offsetsRetentionMs,
+      config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs,
+      config.groupCoordinatorConfig.offsetsTopicPartitions,
+      config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
+      config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
+      config.groupCoordinatorConfig.offsetTopicCompressionType,
+      config.groupCoordinatorConfig.offsetCommitTimeoutMs,
+      config.groupCoordinatorConfig.offsetCommitRequiredAcks)
   }
 
   @BeforeEach
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index d07bcc1..6b4badb 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -449,8 +449,8 @@
    */
   private def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    val numPartitions = server.config.offsetsTopicPartitions
-    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+    val numPartitions = server.config.groupCoordinatorConfig.offsetsTopicPartitions
+    val replicationFactor = server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt
 
     try {
        TestUtils.createTopic(
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index d2bb8ea..9732b0c 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -356,7 +356,7 @@
     val newTopic = if (isInternal) {
       topicName match {
         case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
-          numPartitions = config.offsetsTopicPartitions, replicationFactor = config.offsetsTopicReplicationFactor)
+          numPartitions = config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
         case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
           numPartitions = config.transactionTopicPartitions, replicationFactor = config.transactionTopicReplicationFactor)
       }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d0f0b6e..c63fa40 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1280,7 +1280,7 @@
     assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
     assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
     assertEquals(123L, config.logFlushIntervalMs)
-    assertEquals(CompressionType.SNAPPY, config.offsetsTopicCompressionType)
+    assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType)
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
     assertEquals(false, config.tokenAuthEnabled)
     assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
@@ -1969,14 +1969,14 @@
     ConsumerGroupMigrationPolicy.values.foreach { policy =>
       props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString)
       val config = KafkaConfig.fromProps(props)
-      assertEquals(policy, config.consumerGroupMigrationPolicy)
+      assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy)
     }
 
     // The config is case-insensitive.
     ConsumerGroupMigrationPolicy.values.foreach { policy =>
       props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString.toUpperCase())
       val config = KafkaConfig.fromProps(props)
-      assertEquals(policy, config.consumerGroupMigrationPolicy)
+      assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy)
     }
   }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 9e2bbb4..4740b4e 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.utils.Utils;
@@ -26,6 +27,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -211,77 +213,111 @@
      */
     public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;
 
+    private final AbstractConfig config;
+
+    public GroupCoordinatorConfig(AbstractConfig config) {
+        this.config = config;
+    }
+
     /**
      * The number of threads or event loops running.
      */
-    public final int numThreads;
+    public int numThreads() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+    }
 
     /**
      * The duration in milliseconds that the coordinator will wait for writes to
      * accumulate before flushing them to disk.
      */
-    public final int appendLingerMs;
+    public int appendLingerMs() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
+    }
 
     /**
      * The consumer group session timeout in milliseconds.
      */
-    public final int consumerGroupSessionTimeoutMs;
+    public int consumerGroupSessionTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+    }
 
     /**
      * The consumer group heartbeat interval in milliseconds.
      */
-    public final int consumerGroupHeartbeatIntervalMs;
+    public int consumerGroupHeartbeatIntervalMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+    }
 
     /**
      * The consumer group maximum size.
      */
-    public final int consumerGroupMaxSize;
+    public int consumerGroupMaxSize() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
+    }
 
     /**
      * The consumer group assignors.
      */
-    public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
+    public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() {
+        return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class);
+    }
 
     /**
      * The offsets topic segment bytes should be kept relatively small to facilitate faster
      * log compaction and faster offset loads.
      */
-    public final int offsetsTopicSegmentBytes;
+    public int offsetsTopicSegmentBytes() {
+        return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
+    }
 
     /**
      * The maximum size for a metadata entry associated with an offset commit.
      */
-    public final int offsetMetadataMaxSize;
+    public int offsetMetadataMaxSize() {
+        return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
+    }
 
     /**
      * The classic group maximum size.
      */
-    public final int classicGroupMaxSize;
+    public int classicGroupMaxSize() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
+    }
 
     /**
      * The delay in milliseconds introduced for the first rebalance of a classic group.
      */
-    public final int classicGroupInitialRebalanceDelayMs;
+    public int classicGroupInitialRebalanceDelayMs() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+    }
 
     /**
      * The timeout used to wait for a new member in milliseconds.
      */
-    public final int classicGroupNewMemberJoinTimeoutMs;
+    public int classicGroupNewMemberJoinTimeoutMs() {
+        return CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS;
+    }
 
     /**
      * The classic group minimum session timeout.
      */
-    public final int classicGroupMinSessionTimeoutMs;
+    public int classicGroupMinSessionTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+    }
 
     /**
      * The classic group maximum session timeout.
      */
-    public final int classicGroupMaxSessionTimeoutMs;
+    public int classicGroupMaxSessionTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+    }
 
     /**
      * Frequency at which to check for expired offsets.
      */
-    public final long offsetsRetentionCheckIntervalMs;
+    public long offsetsRetentionCheckIntervalMs() {
+        return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
+    }
 
     /**
      * For subscribed consumers, committed offset of a specific partition will be expired and discarded when:
@@ -297,61 +333,92 @@
      * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's
      *     committed offsets for that topic will also be deleted without extra retention period.
      */
-    public final long offsetsRetentionMs;
+    public long offsetsRetentionMs() {
+        return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
+    }
 
     /**
      * Offset commit will be delayed until all replicas for the offsets topic receive the commit
      * or this timeout is reached
      */
-    public final int offsetCommitTimeoutMs;
+    public int offsetCommitTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+    }
 
     /**
      * The config indicating whether group protocol upgrade/downgrade are allowed.
      */
-    public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
+    public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
+        return ConsumerGroupMigrationPolicy.parse(
+                config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
+    }
 
     /**
      * The compression type used to compress records in batches.
      */
-    public final CompressionType compressionType;
+    public CompressionType offsetTopicCompressionType() {
+        return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
+                .map(CompressionType::forId)
+                .orElse(null);
+    }
 
-    public GroupCoordinatorConfig(
-        int numThreads,
-        int appendLingerMs,
-        int consumerGroupSessionTimeoutMs,
-        int consumerGroupHeartbeatIntervalMs,
-        int consumerGroupMaxSize,
-        List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
-        int offsetsTopicSegmentBytes,
-        int offsetMetadataMaxSize,
-        int classicGroupMaxSize,
-        int classicGroupInitialRebalanceDelayMs,
-        int classicGroupNewMemberJoinTimeoutMs,
-        int classicGroupMinSessionTimeoutMs,
-        int classicGroupMaxSessionTimeoutMs,
-        long offsetsRetentionCheckIntervalMs,
-        long offsetsRetentionMs,
-        int offsetCommitTimeoutMs,
-        ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy,
-        CompressionType compressionType
-    ) {
-        this.numThreads = numThreads;
-        this.appendLingerMs = appendLingerMs;
-        this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
-        this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
-        this.consumerGroupMaxSize = consumerGroupMaxSize;
-        this.consumerGroupAssignors = consumerGroupAssignors;
-        this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
-        this.offsetMetadataMaxSize = offsetMetadataMaxSize;
-        this.classicGroupMaxSize = classicGroupMaxSize;
-        this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
-        this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
-        this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs;
-        this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs;
-        this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
-        this.offsetsRetentionMs = offsetsRetentionMs;
-        this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
-        this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
-        this.compressionType = compressionType;
+    /**
+     * Batch size for reading from the offsets segments when loading offsets into
+     * the cache (soft-limit, overridden if records are too large).
+     */
+    public int offsetsLoadBufferSize() {
+        return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
+    }
+
+    /**
+     * The number of partitions for the offset commit topic (should not change after deployment).
+     */
+    public int offsetsTopicPartitions() {
+        return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
+    }
+
+    /**
+     * The replication factor for the offsets topic (set higher to ensure availability).
+     * Internal topic creation will fail until the cluster size meets this replication factor requirement.
+     */
+    public short offsetsTopicReplicationFactor() {
+        return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
+    }
+
+    /**
+     * DEPRECATED: The required acks before the commit can be accepted.
+     * In general, the default (-1) should not be overridden.
+     */
+    @Deprecated // since 3.8
+    public short offsetCommitRequiredAcks() {
+        return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
+    }
+
+    /**
+     * The minimum allowed session timeout for registered consumers.
+     */
+    public int consumerGroupMinSessionTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * The maximum allowed session timeout for registered consumers.
+     */
+    public int consumerGroupMaxSessionTimeoutMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * The minimum heartbeat interval for registered consumers.
+     */
+    public int consumerGroupMinHeartbeatIntervalMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+    }
+
+    /**
+     * The maximum heartbeat interval for registered consumers.
+     */
+    public int consumerGroupMaxHeartbeatIntervalMs() {
+        return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
     }
 }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e0528f5..8d4845b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -168,7 +168,7 @@
             CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
                 logContext,
                 "group-coordinator-event-processor-",
-                config.numThreads,
+                config.numThreads(),
                 time,
                 coordinatorRuntimeMetrics
             );
@@ -183,12 +183,12 @@
                     .withPartitionWriter(writer)
                     .withLoader(loader)
                     .withCoordinatorShardBuilderSupplier(supplier)
-                    .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
+                    .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs()))
                     .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
                     .withCoordinatorMetrics(groupCoordinatorMetrics)
                     .withSerializer(new CoordinatorRecordSerde())
-                    .withCompression(Compression.of(config.compressionType).build())
-                    .withAppendLingerMs(config.appendLingerMs)
+                    .withCompression(Compression.of(config.offsetTopicCompressionType()).build())
+                    .withAppendLingerMs(config.appendLingerMs())
                     .build();
 
             return new GroupCoordinatorService(
@@ -296,7 +296,7 @@
         return runtime.scheduleWriteOperation(
             "consumer-group-heartbeat",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.consumerGroupHeartbeat(context, request)
         ).exceptionally(exception -> handleOperationException(
             "consumer-group-heartbeat",
@@ -331,8 +331,8 @@
             );
         }
 
-        if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs ||
-            request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) {
+        if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs() ||
+            request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs()) {
             return CompletableFuture.completedFuture(new JoinGroupResponseData()
                 .setMemberId(request.memberId())
                 .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
@@ -344,7 +344,7 @@
         runtime.scheduleWriteOperation(
             "classic-group-join",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.classicGroupJoin(context, request, responseFuture)
         ).exceptionally(exception -> {
             if (!responseFuture.isDone()) {
@@ -387,7 +387,7 @@
         runtime.scheduleWriteOperation(
             "classic-group-sync",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.classicGroupSync(context, request, responseFuture)
         ).exceptionally(exception -> {
             if (!responseFuture.isDone()) {
@@ -427,7 +427,7 @@
         return runtime.scheduleWriteOperation(
             "classic-group-heartbeat",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.classicGroupHeartbeat(context, request)
         ).exceptionally(exception -> handleOperationException(
             "classic-group-heartbeat",
@@ -469,7 +469,7 @@
         return runtime.scheduleWriteOperation(
             "classic-group-leave",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.classicGroupLeave(context, request)
         ).exceptionally(exception -> handleOperationException(
             "classic-group-leave",
@@ -682,7 +682,7 @@
                 runtime.scheduleWriteOperation(
                     "delete-groups",
                     topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs),
+                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
                     coordinator -> coordinator.deleteGroups(context, groupList)
                 ).exceptionally(exception -> handleOperationException(
                     "delete-groups",
@@ -736,7 +736,7 @@
             return runtime.scheduleWriteOperation(
                 "fetch-offsets",
                 topicPartitionFor(request.groupId()),
-                Duration.ofMillis(config.offsetCommitTimeoutMs),
+                Duration.ofMillis(config.offsetCommitTimeoutMs()),
                 coordinator -> new CoordinatorResult<>(
                     Collections.emptyList(),
                     coordinator.fetchOffsets(request, Long.MAX_VALUE)
@@ -787,7 +787,7 @@
             return runtime.scheduleWriteOperation(
                 "fetch-all-offsets",
                 topicPartitionFor(request.groupId()),
-                Duration.ofMillis(config.offsetCommitTimeoutMs),
+                Duration.ofMillis(config.offsetCommitTimeoutMs()),
                 coordinator -> new CoordinatorResult<>(
                     Collections.emptyList(),
                     coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
@@ -829,7 +829,7 @@
         return runtime.scheduleWriteOperation(
             "commit-offset",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.commitOffset(context, request)
         ).exceptionally(exception -> handleOperationException(
             "commit-offset",
@@ -868,7 +868,7 @@
             request.transactionalId(),
             request.producerId(),
             request.producerEpoch(),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.commitTransactionalOffset(context, request),
             context.apiVersion()
         ).exceptionally(exception -> handleOperationException(
@@ -903,7 +903,7 @@
         return runtime.scheduleWriteOperation(
             "delete-offsets",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.deleteOffsets(context, request)
         ).exceptionally(exception -> handleOperationException(
             "delete-offsets",
@@ -973,7 +973,7 @@
             FutureUtils.mapExceptionally(
                 runtime.scheduleWriteAllOperation(
                     "on-partition-deleted",
-                    Duration.ofMillis(config.offsetCommitTimeoutMs),
+                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
                     coordinator -> coordinator.onPartitionsDeleted(topicPartitions)
                 ),
                 exception -> {
@@ -1036,7 +1036,7 @@
         Properties properties = new Properties();
         properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
         properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
-        properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes));
+        properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes()));
         return properties;
     }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 329feca..56ea193 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -181,16 +181,16 @@
                 .withSnapshotRegistry(snapshotRegistry)
                 .withTime(time)
                 .withTimer(timer)
-                .withConsumerGroupAssignors(config.consumerGroupAssignors)
-                .withConsumerGroupMaxSize(config.consumerGroupMaxSize)
-                .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs)
-                .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
-                .withClassicGroupMaxSize(config.classicGroupMaxSize)
-                .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs)
-                .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs)
-                .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs)
-                .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs)
-                .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy)
+                .withConsumerGroupAssignors(config.consumerGroupAssignors())
+                .withConsumerGroupMaxSize(config.consumerGroupMaxSize())
+                .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs())
+                .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs())
+                .withClassicGroupMaxSize(config.classicGroupMaxSize())
+                .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs())
+                .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs())
+                .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs())
+                .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs())
+                .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy())
                 .withGroupCoordinatorMetricsShard(metricsShard)
                 .build();
 
@@ -595,10 +595,10 @@
     private void scheduleGroupMetadataExpiration() {
         timer.schedule(
             GROUP_EXPIRATION_KEY,
-            config.offsetsRetentionCheckIntervalMs,
+            config.offsetsRetentionCheckIntervalMs(),
             TimeUnit.MILLISECONDS,
             true,
-            config.offsetsRetentionCheckIntervalMs,
+            config.offsetsRetentionCheckIntervalMs(),
             this::cleanupGroupMetadata
         );
     }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 600ed16..dad69db 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -412,7 +412,7 @@
      * @return True if the committed metadata is invalid; False otherwise.
      */
     private boolean isMetadataInvalid(String metadata) {
-        return metadata != null && metadata.length() > config.offsetMetadataMaxSize;
+        return metadata != null && metadata.length() > config.offsetMetadataMaxSize();
     }
 
     /**
@@ -881,7 +881,7 @@
             if (!group.isSubscribedToTopic(topic)) {
                 partitions.forEach((partition, offsetAndMetadata) -> {
                     // We don't expire the offset yet if there is a pending transactional offset for the partition.
-                    if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs) &&
+                    if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) &&
                         !hasPendingTransactionalOffsets(groupId, topic, partition)) {
                         expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString());
                         log.debug("[GroupId {}] Expired offset for partition={}-{}", groupId, topic, partition);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 8a9947e..ba3b357 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -16,84 +16,116 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@SuppressWarnings("deprecation")
 public class GroupCoordinatorConfigTest {
+    private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS = Arrays.asList(
+            GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
+            GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
+            GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
+            GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF);
+
     @Test
     public void testConfigs() {
-        ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
-        GroupCoordinatorConfig config = new GroupCoordinatorConfig(
-            10,
-            10,
-            30,
-            10,
-            55,
-            Collections.singletonList(assignor),
-            2222,
-            3333,
-            60,
-            3000,
-            5 * 60 * 1000,
-            120,
-            10 * 60 * 1000,
-            600000L,
-            24 * 60 * 60 * 1000L,
-            5000,
-            ConsumerGroupMigrationPolicy.DISABLED,
-            CompressionType.GZIP
-        );
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10);
+        configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222);
+        configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, 3333);
+        configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, 60);
+        configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000);
+        configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120);
+        configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 60 * 1000);
+        configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, 600000);
+        configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, 24 * 60 * 60 * 1000);
+        configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.GZIP.id);
+        configs.put(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, 555);
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 111);
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 11);
+        configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) 0);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 333);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222);
 
-        assertEquals(10, config.numThreads);
-        assertEquals(30, config.consumerGroupSessionTimeoutMs);
-        assertEquals(10, config.consumerGroupHeartbeatIntervalMs);
-        assertEquals(55, config.consumerGroupMaxSize);
-        assertEquals(Collections.singletonList(assignor), config.consumerGroupAssignors);
-        assertEquals(2222, config.offsetsTopicSegmentBytes);
-        assertEquals(3333, config.offsetMetadataMaxSize);
-        assertEquals(60, config.classicGroupMaxSize);
-        assertEquals(3000, config.classicGroupInitialRebalanceDelayMs);
-        assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs);
-        assertEquals(120, config.classicGroupMinSessionTimeoutMs);
-        assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs);
-        assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs);
-        assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
-        assertEquals(5000, config.offsetCommitTimeoutMs);
-        assertEquals(CompressionType.GZIP, config.compressionType);
-        assertEquals(10, config.appendLingerMs);
+        GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+                new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
+
+        assertEquals(10, config.numThreads());
+        assertEquals(30, config.consumerGroupSessionTimeoutMs());
+        assertEquals(10, config.consumerGroupHeartbeatIntervalMs());
+        assertEquals(55, config.consumerGroupMaxSize());
+        assertEquals(1, config.consumerGroupAssignors().size());
+        assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name());
+        assertEquals(2222, config.offsetsTopicSegmentBytes());
+        assertEquals(3333, config.offsetMetadataMaxSize());
+        assertEquals(60, config.classicGroupMaxSize());
+        assertEquals(3000, config.classicGroupInitialRebalanceDelayMs());
+        assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs());
+        assertEquals(120, config.classicGroupMinSessionTimeoutMs());
+        assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs());
+        assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs());
+        assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), config.offsetsRetentionMs());
+        assertEquals(5000, config.offsetCommitTimeoutMs());
+        assertEquals(CompressionType.GZIP, config.offsetTopicCompressionType());
+        assertEquals(10, config.appendLingerMs());
+        assertEquals(555, config.offsetsLoadBufferSize());
+        assertEquals(111, config.offsetsTopicPartitions());
+        assertEquals(11, config.offsetsTopicReplicationFactor());
+        assertEquals(0, config.offsetCommitRequiredAcks());
+        assertEquals(333, config.consumerGroupMinSessionTimeoutMs());
+        assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
+        assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
+        assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
     }
 
     public static GroupCoordinatorConfig createGroupCoordinatorConfig(
         int offsetMetadataMaxSize,
         long offsetsRetentionCheckIntervalMs,
-        long offsetsRetentionMs
+        int offsetsRetentionMinutes
     ) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
+        configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000);
+        configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, offsetMetadataMaxSize);
+        configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE);
+        configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000);
+        configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120);
+        configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 5 * 1000);
+        configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, offsetsRetentionCheckIntervalMs);
+        configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, offsetsRetentionMinutes);
+        configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000);
+        configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
+        configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id);
+
         return new GroupCoordinatorConfig(
-            1,
-            10,
-            45,
-            5,
-            Integer.MAX_VALUE,
-            Collections.singletonList(new RangeAssignor()),
-            1000,
-            offsetMetadataMaxSize,
-            Integer.MAX_VALUE,
-            3000,
-            5 * 60 * 1000,
-            120,
-            10 * 5 * 1000,
-            offsetsRetentionCheckIntervalMs,
-            offsetsRetentionMs,
-            5000,
-            ConsumerGroupMigrationPolicy.DISABLED,
-            CompressionType.NONE
-        );
+                new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
     }
 }
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 7ad0ecd..24da10e 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -57,7 +57,6 @@
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -66,7 +65,6 @@
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
 import org.apache.kafka.server.record.BrokerCompressionType;
@@ -117,26 +115,7 @@
     }
 
     private GroupCoordinatorConfig createConfig() {
-        return new GroupCoordinatorConfig(
-            1,
-            10,
-            45,
-            5,
-            Integer.MAX_VALUE,
-            Collections.singletonList(new RangeAssignor()),
-            1000,
-            4096,
-            Integer.MAX_VALUE,
-            3000,
-            5 * 60 * 1000,
-            120,
-            10 * 5 * 1000,
-            600000L,
-            24 * 60 * 1000L,
-            5000,
-            ConsumerGroupMigrationPolicy.DISABLED,
-            CompressionType.NONE
-        );
+        return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 600000L, 24);
     }
 
     @Test
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 61fcca2..bd997d7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -68,6 +68,7 @@
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.net.InetAddress;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -105,12 +106,12 @@
             private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
 
             Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
-                config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000);
+                config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60);
                 return this;
             }
 
-            Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
-                config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs);
+            Builder withOffsetsRetentionMinutes(int offsetsRetentionMinutes) {
+                config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMinutes);
                 return this;
             }
 
@@ -122,7 +123,7 @@
             OffsetMetadataManagerTestContext build() {
                 if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
                 if (config == null) {
-                    config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 1000);
+                    config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24);
                 }
 
                 if (groupMetadataManager == null) {
@@ -2515,7 +2516,7 @@
 
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
             .withGroupMetadataManager(groupMetadataManager)
-            .withOffsetsRetentionMs(1000)
+            .withOffsetsRetentionMinutes(1)
             .build();
 
         long commitTimestamp = context.time.milliseconds();
@@ -2524,7 +2525,7 @@
         context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
         context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500);
 
-        context.time.sleep(1000);
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
 
         // firstTopic-0: group is still subscribed to firstTopic. Do not expire.
         // secondTopic-0: should expire as offset retention has passed.
@@ -2576,7 +2577,7 @@
 
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
             .withGroupMetadataManager(groupMetadataManager)
-            .withOffsetsRetentionMs(1000)
+            .withOffsetsRetentionMinutes(1)
             .build();
 
         long commitTimestamp = context.time.milliseconds();
@@ -2584,7 +2585,7 @@
         context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
         context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, commitTimestamp + 500);
 
-        context.time.sleep(1000);
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
 
         when(groupMetadataManager.group("group-id")).thenReturn(group);
         when(group.offsetExpirationCondition()).thenReturn(Optional.of(
@@ -3001,7 +3002,7 @@
 
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
             .withGroupMetadataManager(groupMetadataManager)
-            .withOffsetsRetentionMs(1000)
+            .withOffsetsRetentionMinutes(1)
             .build();
 
         long commitTimestamp = context.time.milliseconds();
@@ -3010,7 +3011,7 @@
         context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
         context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500);
 
-        context.time.sleep(1000);
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
 
         // firstTopic-0: group is still subscribed to firstTopic. Do not expire.
         // secondTopic-0: should expire as offset retention has passed.