KAFKA-15853: Move OffsetConfig to group-coordinator module (#15161)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Jacot <djacot@confluent.io>, Nikolay <nizhikov@apache.org>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index aba2053..cff3482 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -32,6 +32,7 @@
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
@@ -1751,17 +1752,17 @@
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}
- private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
- maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetsTopicCompressionType = config.offsetsTopicCompressionType,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+ private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
+ config.offsetMetadataMaxSize,
+ config.offsetsLoadBufferSize,
+ config.offsetsRetentionMinutes * 60L * 1000L,
+ config.offsetsRetentionCheckIntervalMs,
+ config.offsetsTopicPartitions,
+ config.offsetsTopicSegmentBytes,
+ config.offsetsTopicReplicationFactor,
+ config.offsetsTopicCompressionType,
+ config.offsetCommitTimeoutMs,
+ config.offsetCommitRequiredAcks
)
private[group] def apply(
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 31af1d8..3177576 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -43,6 +43,7 @@
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicIdPartition, TopicPartition}
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
diff --git a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
deleted file mode 100644
index 189e8e6..0000000
--- a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator.group
-
-import org.apache.kafka.common.record.CompressionType
-
-/**
- * Configuration settings for in-built offset management
- * @param maxMetadataSize The maximum allowed metadata for any offset commit.
- * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
- * @param offsetsRetentionMs For subscribed consumers, committed offset of a specific partition will be expired and discarded when
- * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty);
- * 2) this retention period has elapsed since the last time an offset is committed for the partition AND the group is no longer subscribed to the corresponding topic.
- * For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit.
- * Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted immediately;
- * Also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period
- * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
- * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
- * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
- * log compaction and faster offset loads
- * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
- * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
- * order to achieve "atomic" commits.
- * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
- * commit or this timeout is reached. (Similar to the producer request timeout.)
- * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
- * should not be overridden.
- */
-case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize,
- loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize,
- offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs,
- offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions,
- offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor,
- offsetsTopicCompressionType: CompressionType = OffsetConfig.DefaultOffsetsTopicCompressionType,
- offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs,
- offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks)
-
-object OffsetConfig {
- val DefaultMaxMetadataSize = 4096
- val DefaultLoadBufferSize = 5*1024*1024
- val DefaultOffsetRetentionMs = 24*60*60*1000L
- val DefaultOffsetsRetentionCheckIntervalMs = 600000L
- val DefaultOffsetsTopicNumPartitions = 50
- val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
- val DefaultOffsetsTopicReplicationFactor = 3.toShort
- val DefaultOffsetsTopicCompressionType = CompressionType.NONE
- val DefaultOffsetCommitTimeoutMs = 5000
- val DefaultOffsetCommitRequiredAcks = (-1).toShort
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2afb1d6..210722b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,6 @@
import java.util.concurrent.TimeUnit
import java.util.{Collections, Locale, Properties}
import kafka.cluster.EndPoint
-import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
@@ -42,6 +41,7 @@
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor, UniformAssignor}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
@@ -181,16 +181,16 @@
val ConsumerGroupAssignors = List(classOf[UniformAssignor].getName, classOf[RangeAssignor].getName).asJava
/** ********* Offset management configuration ***********/
- val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
- val OffsetsLoadBufferSize = OffsetConfig.DefaultLoadBufferSize
- val OffsetsTopicReplicationFactor = OffsetConfig.DefaultOffsetsTopicReplicationFactor
- val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions
- val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes
- val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionType.id
+ val OffsetMetadataMaxSize = OffsetConfig.DEFAULT_MAX_METADATA_SIZE
+ val OffsetsLoadBufferSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE
+ val OffsetsTopicReplicationFactor = OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR
+ val OffsetsTopicPartitions: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS
+ val OffsetsTopicSegmentBytes: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES
+ val OffsetsTopicCompressionCodec: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id
val OffsetsRetentionMinutes: Int = 7 * 24 * 60
- val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs
- val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs
- val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks
+ val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS
+ val OffsetCommitTimeoutMs = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS
+ val OffsetCommitRequiredAcks = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMs = TransactionStateManager.DefaultTransactionalIdExpirationMs
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 58c3d85..4d7ddc1 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -20,9 +20,7 @@
import java.util.{Collections, Objects, Properties}
import java.util.concurrent.TimeUnit
-
import kafka.api.SaslSetup
-import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
@@ -31,6 +29,7 @@
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -108,7 +107,7 @@
s"Unexpected ${KafkaConfig.InterBrokerListenerNameProp} for broker ${config.brokerId}")
}
- TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS,
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2b6e226..4ffb5e6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -36,6 +36,7 @@
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.VerificationGuard
@@ -3816,7 +3817,7 @@
val producerEpoch: Short = 3
val offsets = Map(
- tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DefaultMaxMetadataSize + 1), 0)
+ tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DEFAULT_MAX_METADATA_SIZE + 1), 0)
)
val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, offsets)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d43a122..b2c2ece 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -42,6 +42,7 @@
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
@@ -79,16 +80,16 @@
private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
- OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetsTopicCompressionType = config.offsetsTopicCompressionType,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ new OffsetConfig(config.offsetMetadataMaxSize,
+ config.offsetsLoadBufferSize,
+ config.offsetsRetentionMinutes * 60 * 1000L,
+ config.offsetsRetentionCheckIntervalMs,
+ config.offsetsTopicPartitions,
+ config.offsetsTopicSegmentBytes,
+ config.offsetsTopicReplicationFactor,
+ config.offsetsTopicCompressionType,
+ config.offsetCommitTimeoutMs,
+ config.offsetCommitRequiredAcks)
}
@BeforeEach
@@ -775,7 +776,7 @@
)
// create a GroupMetadata record larger then offsets.load.buffer.size (here at least 16 bytes larger)
- val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
+ val assignmentSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE + 16
val memberId = "98098230493"
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java
new file mode 100644
index 0000000..e15ce64
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.CompressionType;
+
+public class OffsetConfig {
+ public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
+ public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+ public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 1000L;
+ public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 600000L;
+ public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
+ public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 1024;
+ public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
+ public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE = CompressionType.NONE;
+ public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
+ public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+
+ public final int maxMetadataSize;
+ public final int loadBufferSize;
+ public final long offsetsRetentionMs;
+ public final long offsetsRetentionCheckIntervalMs;
+ public final int offsetsTopicNumPartitions;
+ public final int offsetsTopicSegmentBytes;
+ public final short offsetsTopicReplicationFactor;
+ public final CompressionType offsetsTopicCompressionType;
+ public final int offsetCommitTimeoutMs;
+ public final short offsetCommitRequiredAcks;
+
+ /**
+ * Configuration settings for in-built offset management
+ * @param maxMetadataSize The maximum allowed metadata for any offset commit.
+ * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
+ * @param offsetsRetentionMs For subscribed consumers, committed offset of a specific partition will be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset is committed for the partition AND the group is no longer subscribed to the corresponding topic.
+ * For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit.
+ * Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted immediately;
+ * Also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
+ * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
+ * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
+ * log compaction and faster offset loads
+ * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
+ * @param offsetsTopicCompressionType Compression type for the offsets topic - compression should be turned on in
+ * order to achieve "atomic" commits.
+ * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
+ * commit or this timeout is reached. (Similar to the producer request timeout.)
+ * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
+ * should not be overridden.
+ */
+ public OffsetConfig(int maxMetadataSize,
+ int loadBufferSize,
+ long offsetsRetentionMs,
+ long offsetsRetentionCheckIntervalMs,
+ int offsetsTopicNumPartitions,
+ int offsetsTopicSegmentBytes,
+ short offsetsTopicReplicationFactor,
+ CompressionType offsetsTopicCompressionType,
+ int offsetCommitTimeoutMs,
+ short offsetCommitRequiredAcks
+ ) {
+ this.maxMetadataSize = maxMetadataSize;
+ this.loadBufferSize = loadBufferSize;
+ this.offsetsRetentionMs = offsetsRetentionMs;
+ this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
+ this.offsetsTopicNumPartitions = offsetsTopicNumPartitions;
+ this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
+ this.offsetsTopicReplicationFactor = offsetsTopicReplicationFactor;
+ this.offsetsTopicCompressionType = offsetsTopicCompressionType;
+ this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
+ this.offsetCommitRequiredAcks = offsetCommitRequiredAcks;
+ }
+
+ public OffsetConfig() {
+ this(DEFAULT_MAX_METADATA_SIZE, DEFAULT_LOAD_BUFFER_SIZE, DEFAULT_OFFSET_RETENTION_MS,
+ DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS, DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS,
+ DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES, DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR,
+ DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE, DEFAULT_OFFSET_COMMIT_TIMEOUT_MS,
+ DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS);
+ }
+}
\ No newline at end of file