blob: 479fc61b8f0b31b3f920011ebb1a766f263f86e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.group
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Optional, OptionalInt}
import com.yammer.metrics.core.Gauge
import javax.management.ObjectName
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.UnifiedLog
import kafka.server.{HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogAppendInfo, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import scala.jdk.CollectionConverters._
import scala.collection._
class GroupMetadataManagerTest {
var time: MockTime = _
var replicaManager: ReplicaManager = _
var groupMetadataManager: GroupMetadataManager = _
var scheduler: KafkaScheduler = _
var partition: Partition = _
var defaultOffsetRetentionMs = Long.MaxValue
var metrics: kMetrics = _
val groupId = "foo"
val groupInstanceId = "bar"
val groupPartitionId = 0
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val protocolType = "protocolType"
val rebalanceTimeout = 60000
val sessionTimeout = 10000
val defaultRequireStable = false
val numOffsetsPartitions = 2
private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetsTopicCompressionType = config.offsetsTopicCompressionType,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
}
@BeforeEach
def setUp(): Unit = {
defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
metrics = new kMetrics()
time = new MockTime
replicaManager = mock(classOf[ReplicaManager])
groupMetadataManager = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager,
time, metrics)
groupMetadataManager.startup(() => numOffsetsPartitions, false)
partition = mock(classOf[Partition])
}
@AfterEach
def tearDown(): Unit = {
groupMetadataManager.shutdown()
}
@Test
def testLogInfoFromCleanupGroupMetadata(): Unit = {
var expiredOffsets: Int = 0
var infoCount = 0
val gmm = new GroupMetadataManager(0, MetadataVersion.latest, offsetConfig, replicaManager, time, metrics) {
override def cleanupGroupMetadata(groups: Iterable[GroupMetadata], requestLocal: RequestLocal,
selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
override def info(msg: => String): Unit = infoCount += 1
}
gmm.startup(() => numOffsetsPartitions, false)
try {
// if there are no offsets to expire, we skip to log
gmm.cleanupGroupMetadata()
assertEquals(0, infoCount)
// if there are offsets to expire, we should log info
expiredOffsets = 100
gmm.cleanupGroupMetadata()
assertEquals(1, infoCount)
} finally {
gmm.shutdown()
}
}
@Test
def testLoadOffsetsWithoutGroup(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords.toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadEmptyGroupWithOffsets(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 15
val protocolType = "consumer"
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.leaderOrNull)
assertNull(group.protocolName.orNull)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadTransactionalOffsetsWithoutGroup(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testDoNotLoadAbortedTransactionalOffsetCommits(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val abortedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = false)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
// Since there are no committed offsets for the group, and there is no other group metadata, we don't expect the
// group to be loaded.
assertEquals(None, groupMetadataManager.getGroup(groupId))
}
@Test
def testGroupLoadedWithPendingCommits(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
val bar0 = new TopicPartition("bar", 0)
val pendingOffsets = Map(
foo0 -> 23L,
foo1 -> 455L,
bar0 -> 8992L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, pendingOffsets)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
// Ensure that no offsets are materialized, but that we have offsets pending.
assertEquals(0, group.allOffsets.size)
assertTrue(group.hasOffsets)
assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(foo0))
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(foo1))
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(bar0))
}
@Test
def testLoadWithCommittedAndAbortedTransactionalOffsetCommits(): Unit = {
// A test which loads a log with a mix of committed and aborted transactional offset committed messages.
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val abortedOffsets = Map(
new TopicPartition("foo", 2) -> 231L,
new TopicPartition("foo", 3) -> 4551L,
new TopicPartition("bar", 1) -> 89921L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = false)
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
// Ensure that only the committed offsets are materialized, and that there are no pending commits for the producer.
// This allows us to be certain that the aborted offset commits are truly discarded.
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
}
@Test
def testLoadWithCommittedAndAbortedAndPendingTransactionalOffsetCommits(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val foo3 = new TopicPartition("foo", 3)
val abortedOffsets = Map(
new TopicPartition("foo", 2) -> 231L,
foo3 -> 4551L,
new TopicPartition("bar", 1) -> 89921L
)
val pendingOffsets = Map(
foo3 -> 2312L,
new TopicPartition("foo", 4) -> 45512L,
new TopicPartition("bar", 2) -> 899212L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
val commitOffsetsLogPosition = nextOffset
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = false)
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, pendingOffsets)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
// Ensure that only the committed offsets are materialized, and that there are no pending commits for the producer.
// This allows us to be certain that the aborted offset commits are truly discarded.
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertEquals(Some(commitOffsetsLogPosition), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
}
// We should have pending commits.
assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(foo3))
// The loaded pending commits should materialize after a commit marker comes in.
groupMetadataManager.handleTxnCompletion(producerId, List(groupMetadataTopicPartition.partition).toSet, isCommit = true)
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
pendingOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadTransactionalOffsetCommitsFromMultipleProducers(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val firstProducerId = 1000L
val firstProducerEpoch: Short = 2
val secondProducerId = 1001L
val secondProducerEpoch: Short = 3
val groupEpoch = 2
val committedOffsetsFirstProducer = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val committedOffsetsSecondProducer = Map(
new TopicPartition("foo", 2) -> 231L,
new TopicPartition("foo", 3) -> 4551L,
new TopicPartition("bar", 1) -> 89921L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0L
val firstProduceRecordOffset = nextOffset
nextOffset += appendTransactionalOffsetCommits(buffer, firstProducerId, firstProducerEpoch, nextOffset, committedOffsetsFirstProducer)
nextOffset += completeTransactionalOffsetCommit(buffer, firstProducerId, firstProducerEpoch, nextOffset, isCommit = true)
val secondProducerRecordOffset = nextOffset
nextOffset += appendTransactionalOffsetCommits(buffer, secondProducerId, secondProducerEpoch, nextOffset, committedOffsetsSecondProducer)
nextOffset += completeTransactionalOffsetCommit(buffer, secondProducerId, secondProducerEpoch, nextOffset, isCommit = true)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
// Ensure that only the committed offsets are materialized, and that there are no pending commits for the producer.
// This allows us to be certain that the aborted offset commits are truly discarded.
assertEquals(committedOffsetsFirstProducer.size + committedOffsetsSecondProducer.size, group.allOffsets.size)
committedOffsetsFirstProducer.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertEquals(Some(firstProduceRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
}
committedOffsetsSecondProducer.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertEquals(Some(secondProducerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
}
}
@Test
def testGroupLoadWithConsumerAndTransactionalOffsetCommitsConsumerWins(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val transactionalOffsetCommits = Map(
new TopicPartition("foo", 0) -> 23L
)
val consumerOffsetCommits = Map(
new TopicPartition("foo", 0) -> 24L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits)
val consumerRecordOffset = nextOffset
nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(1, group.allOffsets.size)
assertTrue(group.hasOffsets)
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
assertEquals(consumerOffsetCommits.size, group.allOffsets.size)
consumerOffsetCommits.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertEquals(Some(consumerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
}
}
@Test
def testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
val groupEpoch = 2
val transactionalOffsetCommits = Map(
new TopicPartition("foo", 0) -> 23L
)
val consumerOffsetCommits = Map(
new TopicPartition("foo", 0) -> 24L
)
val buffer = ByteBuffer.allocate(1024)
var nextOffset = 0
nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits)
nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits)
nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
buffer.flip()
val records = MemoryRecords.readableRecords(buffer)
expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(1, group.allOffsets.size)
assertTrue(group.hasOffsets)
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
assertEquals(consumerOffsetCommits.size, group.allOffsets.size)
transactionalOffsetCommits.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testGroupNotExists(): Unit = {
// group is not owned
assertFalse(groupMetadataManager.groupNotExists(groupId))
groupMetadataManager.addOwnedPartition(groupPartitionId)
// group is owned but does not exist yet
assertTrue(groupMetadataManager.groupNotExists(groupId))
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// group is owned but not Dead
assertFalse(groupMetadataManager.groupNotExists(groupId))
group.transitionTo(Dead)
// group is owned and Dead
assertTrue(groupMetadataManager.groupNotExists(groupId))
}
private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = {
val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset)
val commitRecords = createCommittedOffsetRecords(offsets)
commitRecords.foreach(builder.append)
builder.build()
offsets.size
}
private def appendTransactionalOffsetCommits(buffer: ByteBuffer, producerId: Long, producerEpoch: Short,
baseOffset: Long, offsets: Map[TopicPartition, Long]): Int = {
val builder = MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true)
val commitRecords = createCommittedOffsetRecords(offsets)
commitRecords.foreach(builder.append)
builder.build()
offsets.size
}
private def completeTransactionalOffsetCommit(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, baseOffset: Long,
isCommit: Boolean): Int = {
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), producerId, producerEpoch, 0, true, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH)
val controlRecordType = if (isCommit) ControlRecordType.COMMIT else ControlRecordType.ABORT
builder.appendEndTxnMarker(time.milliseconds(), new EndTransactionMarker(controlRecordType, 0))
builder.build()
1
}
@Test
def testLoadOffsetsWithTombstones(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val groupEpoch = 2
val tombstonePartition = new TopicPartition("foo", 1)
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
tombstonePartition -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(tombstone)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size - 1, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
if (topicPartition == tombstonePartition)
assertEquals(None, group.offset(topicPartition))
else
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadOffsetsAndGroup(): Unit = {
loadOffsetsAndGroup(groupTopicPartition, 2)
}
def loadOffsetsAndGroup(groupMetadataTopicPartition: TopicPartition, groupEpoch: Int): GroupMetadata = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
group
}
@Test
def testLoadOffsetsAndGroupIgnored(): Unit = {
val groupEpoch = 2
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch), _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch - 1, _ => (), 0L)
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
}
@Test
def testUnloadOffsetsAndGroup(): Unit = {
val groupEpoch = 2
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch), _ => ())
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
}
@Test
def testUnloadOffsetsAndGroupIgnored(): Unit = {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch - 1), _ => ())
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(initiallyLoaded.groupId, group.groupId)
assertEquals(initiallyLoaded.currentState, group.currentState)
assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
assertEquals(initiallyLoaded.generationId, group.generationId)
assertEquals(initiallyLoaded.protocolType, group.protocolType)
assertEquals(initiallyLoaded.protocolName.orNull, group.protocolName.orNull)
assertEquals(initiallyLoaded.allMembers, group.allMembers)
assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
}
@Test
def testUnloadOffsetsAndGroupIgnoredAfterStopReplica(): Unit = {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.empty, _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),
"Replica which was stopped still in epochForPartitionId")
loadOffsetsAndGroup(groupTopicPartition, groupEpoch + 1)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(initiallyLoaded.groupId, group.groupId)
assertEquals(initiallyLoaded.currentState, group.currentState)
assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
assertEquals(initiallyLoaded.generationId, group.generationId)
assertEquals(initiallyLoaded.protocolType, group.protocolType)
assertEquals(initiallyLoaded.protocolName.orNull, group.protocolName.orNull)
assertEquals(initiallyLoaded.allMembers, group.allMembers)
assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
}
@Test
def testLoadGroupWithTombstone(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val groupEpoch = 2
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
protocolType = "consumer", protocol = "range", memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
assertEquals(None, groupMetadataManager.getGroup(groupId))
}
@Test
def testLoadGroupWithLargeGroupMetadataRecord(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
// create a GroupMetadata record larger then offsets.load.buffer.size (here at least 16 bytes larger)
val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
val memberId = "98098230493"
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
protocolType = "consumer", protocol = "range", memberId, new Array[Byte](assignmentSize))
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
// Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
val startOffset = 0L
val endOffset = 10L
val groupEpoch = 2
val logMock: UnifiedLog = mock(classOf[UnifiedLog])
when(replicaManager.getLog(groupTopicPartition)).thenReturn(Some(logMock))
expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY)
when(replicaManager.getLogEndOffset(groupTopicPartition)).thenReturn(Some(endOffset))
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch, _ => (), 0L)
verify(logMock).logStartOffset
verify(logMock).read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))
verify(replicaManager).getLog(groupTopicPartition)
verify(replicaManager, times(2)).getLogEndOffset(groupTopicPartition)
assertFalse(groupMetadataManager.isPartitionLoading(groupTopicPartition.partition()))
}
@Test
def testOffsetWriteAfterGroupRemoved(): Unit = {
// this test case checks the following scenario:
// 1. the group exists at some point in time, but is later removed (because all members left)
// 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
val groupMetadataTopicPartition = groupTopicPartition
val generation = 293
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
val generation = 293
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val groupEpoch = 2
val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
val tp2 = new TopicPartition("bar", 0)
val tp3 = new TopicPartition("xxx", 0)
val fileRecordsMock: FileRecords = mock(classOf[FileRecords])
val logMock: UnifiedLog = mock(classOf[UnifiedLog])
when(replicaManager.getLog(groupTopicPartition)).thenReturn(Some(logMock))
val segment1MemberId = "a"
val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
generation, protocolType, protocol, segment1MemberId))).toArray: _*)
val segment1End = startOffset + segment1Records.records.asScala.size
val segment2MemberId = "b"
val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
(createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
generation, protocolType, protocol, segment2MemberId))).toArray: _*)
val segment2End = segment1End + segment2Records.records.asScala.size
when(logMock.logStartOffset)
.thenReturn(segment1End)
.thenReturn(segment2End)
when(logMock.read(ArgumentMatchers.eq(segment1End),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock))
when(logMock.read(ArgumentMatchers.eq(segment2End),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock))
when(fileRecordsMock.sizeInBytes())
.thenReturn(segment1Records.sizeInBytes)
.thenReturn(segment2Records.sizeInBytes)
val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
when(fileRecordsMock.readInto(bufferCapture.capture(), anyInt()))
.thenAnswer(_ => {
val buffer = bufferCapture.getValue
buffer.put(segment1Records.buffer.duplicate)
buffer.flip()
}).thenAnswer(_ => {
val buffer = bufferCapture.getValue
buffer.put(segment2Records.buffer.duplicate)
buffer.flip()
})
when(replicaManager.getLogEndOffset(groupTopicPartition)).thenReturn(Some(segment2End))
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(segment2MemberId, group.leaderOrNull, "segment2 group record member should be elected")
assertEquals(Set(segment2MemberId), group.allMembers, "segment2 group record member should be only member")
// offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
val committedOffsets = segment1Offsets ++ segment2Offsets
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testAddGroup(): Unit = {
val group = new GroupMetadata("foo", Empty, time)
assertEquals(group, groupMetadataManager.addGroup(group))
assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", Empty, time)))
}
@Test
def testloadGroupWithStaticMember(): Unit = {
val generation = 27
val protocolType = "consumer"
val staticMemberId = "staticMemberId"
val dynamicMemberId = "dynamicMemberId"
val staticMember = new MemberMetadata(staticMemberId, Some(groupInstanceId), "", "", rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
val dynamicMember = new MemberMetadata(dynamicMemberId, None, "", "", rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
val members = Seq(staticMember, dynamicMember)
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, members, time)
assertTrue(group.is(Empty))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertTrue(group.has(staticMemberId))
assertTrue(group.has(dynamicMemberId))
assertTrue(group.hasStaticMember(groupInstanceId))
assertEquals(Some(staticMemberId), group.currentStaticMemberId(groupInstanceId))
}
@Test
def testLoadConsumerGroup(): Unit = {
val generation = 27
val protocolType = "consumer"
val protocol = "protocol"
val memberId = "member1"
val topic = "foo"
val subscriptions = List(
("protocol", ConsumerProtocol.serializeSubscription(new Subscription(List(topic).asJava)).array())
)
val member = new MemberMetadata(memberId, Some(groupInstanceId), "", "", rebalanceTimeout,
sessionTimeout, protocolType, subscriptions)
val members = Seq(member)
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, null, None,
members, time)
assertTrue(group.is(Stable))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Some(Set(topic)), group.getSubscribedTopics)
assertTrue(group.has(memberId))
}
@Test
def testLoadEmptyConsumerGroup(): Unit = {
val generation = 27
val protocolType = "consumer"
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None,
Seq(), time)
assertTrue(group.is(Empty))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.protocolName.orNull)
assertEquals(Some(Set.empty), group.getSubscribedTopics)
}
@Test
def testLoadConsumerGroupWithFaultyConsumerProtocol(): Unit = {
val generation = 27
val protocolType = "consumer"
val protocol = "protocol"
val memberId = "member1"
val subscriptions = List(("protocol", Array[Byte]()))
val member = new MemberMetadata(memberId, Some(groupInstanceId), "", "", rebalanceTimeout,
sessionTimeout, protocolType, subscriptions)
val members = Seq(member)
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, null, None,
members, time)
assertTrue(group.is(Stable))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(None, group.getSubscribedTopics)
assertTrue(group.has(memberId))
}
@Test
def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
val generation = 1
val protocol = "range"
val memberId = "memberId"
val unsupportedVersion = Short.MinValue
// put the unsupported version as the version value
val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
.value().putShort(unsupportedVersion)
// reset the position to the starting position 0 so that it can read the data in correct order
groupMetadataRecordValue.position(0)
val e = assertThrows(classOf[IllegalStateException],
() => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
assertEquals(s"Unknown group metadata message version: $unsupportedVersion", e.getMessage)
}
@Test
def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
val generation = 1
val protocol = "range"
val memberId = "memberId"
for (metadataVersion <- MetadataVersion.VERSIONS) {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
// GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
if (metadataVersion.isAtLeast(IBP_2_1_IV0))
assertEquals(Some(time.milliseconds()), deserializedGroupMetadata.currentStateTimestamp,
s"the metadataVersion $metadataVersion doesn't set the currentStateTimestamp correctly.")
else
assertTrue(deserializedGroupMetadata.currentStateTimestamp.isEmpty,
s"the metadataVersion $metadataVersion should not set the currentStateTimestamp.")
}
}
@Test
def testReadFromOldGroupMetadata(): Unit = {
val generation = 1
val protocol = "range"
val memberId = "memberId"
val oldMetadataVersions = Array(IBP_0_9_0, IBP_0_10_1_IV0, IBP_2_1_IV0)
for (metadataVersion <- oldMetadataVersions) {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
assertEquals(groupId, deserializedGroupMetadata.groupId)
assertEquals(generation, deserializedGroupMetadata.generationId)
assertEquals(protocolType, deserializedGroupMetadata.protocolType.get)
assertEquals(protocol, deserializedGroupMetadata.protocolName.orNull)
assertEquals(1, deserializedGroupMetadata.allMembers.size)
assertEquals(deserializedGroupMetadata.allMembers, deserializedGroupMetadata.allDynamicMembers)
assertTrue(deserializedGroupMetadata.allMembers.contains(memberId))
assertTrue(deserializedGroupMetadata.allStaticMembers.isEmpty)
}
}
@Test
def testStoreEmptyGroup(): Unit = {
val generation = 27
val protocolType = "consumer"
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE)
var maybeError: Option[Errors] = None
def callback(error: Errors): Unit = {
maybeError = Some(error)
}
groupMetadataManager.storeGroup(group, Map.empty, callback)
assertEquals(Some(Errors.NONE), maybeError)
val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
.records.asScala.toList
assertEquals(1, records.size)
val record = records.head
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty))
assertEquals(generation, groupMetadata.generationId)
assertEquals(Some(protocolType), groupMetadata.protocolType)
}
@Test
def testStoreEmptySimpleGroup(): Unit = {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE)
var maybeError: Option[Errors] = None
def callback(error: Errors): Unit = {
maybeError = Some(error)
}
groupMetadataManager.storeGroup(group, Map.empty, callback)
assertEquals(Some(Errors.NONE), maybeError)
val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
.records.asScala.toList
assertEquals(1, records.size)
val record = records.head
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty))
assertEquals(0, groupMetadata.generationId)
assertEquals(None, groupMetadata.protocolType)
}
@Test
def testStoreGroupErrorMapping(): Unit = {
assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
}
private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors): Unit = {
reset(replicaManager)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
expectAppendMessage(appendError)
var maybeError: Option[Errors] = None
def callback(error: Errors): Unit = {
maybeError = Some(error)
}
groupMetadataManager.storeGroup(group, Map.empty, callback)
assertEquals(Some(expectedError), maybeError)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@Test
def testStoreNonEmptyGroup(): Unit = {
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
expectAppendMessage(Errors.NONE)
var maybeError: Option[Errors] = None
def callback(error: Errors): Unit = {
maybeError = Some(error)
}
groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
assertEquals(Some(Errors.NONE), maybeError)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@Test
def testStoreNonEmptyGroupWhenCoordinatorHasMoved(): Unit = {
when(replicaManager.getMagic(any())).thenReturn(None)
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
val group = new GroupMetadata(groupId, Empty, time)
val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
var maybeError: Option[Errors] = None
def callback(error: Errors): Unit = {
maybeError = Some(error)
}
groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
verify(replicaManager).getMagic(any())
}
@Test
def testCommitOffset(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.NONE), maybeError)
assertTrue(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicIdPartition.topicPartition)))
val maybePartitionResponse = cachedOffsets.get(topicIdPartition.topicPartition)
assertFalse(maybePartitionResponse.isEmpty)
val partitionResponse = maybePartitionResponse.get
assertEquals(Errors.NONE, partitionResponse.error)
assertEquals(offset, partitionResponse.offset)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
// Will update sensor after commit
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testTransactionalCommitOffsetCommitted(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata)
val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedResponseCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
assertEquals(Some(offsetAndMetadata), group.offset(topicIdPartition.topicPartition))
}
@Test
def testTransactionalCommitOffsetAppendFailure(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
group.completePendingTxnOffsetCommit(producerId, isCommit = false)
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@Test
def testTransactionalCommitOffsetAborted(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback, producerId, producerEpoch)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
group.completePendingTxnOffsetCommit(producerId, isCommit = false)
assertFalse(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
}
@Test
def testCommitOffsetWhenCoordinatorHasMoved(): Unit = {
when(replicaManager.getMagic(any())).thenReturn(None)
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
verify(replicaManager).getMagic(any())
}
@Test
def testCommitOffsetFailure(): Unit = {
assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)
assertCommitOffsetErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
}
private def assertCommitOffsetErrorMapping(appendError: Errors, expectedError: Errors): Unit = {
reset(replicaManager)
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(expectedError), maybeError)
assertFalse(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
verify(replicaManager).getMagic(any())
// Will not update sensor if failed
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testCommitOffsetPartialFailure(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val topicIdPartitionFailed = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
// This will failed
topicIdPartitionFailed -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
val capturedResponseCallback = verifyAppendAndCaptureCallback()
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition))
assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), commitErrors.get.get(topicIdPartitionFailed))
assertTrue(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition.topicPartition, topicIdPartitionFailed.topicPartition))
)
assertEquals(
Some(offset),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartitionFailed.topicPartition).map(_.offset)
)
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager).getMagic(any())
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testOffsetMetadataTooLarge(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertFalse(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
assertFalse(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testExpireOffset(): Unit = {
val memberId = ""
val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
topicIdPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// expire only one of the offsets
time.sleep(2)
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
assertEquals(Some(offset), group.offset(topicIdPartition2.topicPartition).map(_.offset))
val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition))
)
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
any(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
verify(replicaManager, times(2)).getMagic(any())
}
@Test
def testGroupMetadataRemoval(): Unit = {
val topicPartition1 = new TopicPartition("foo", 0)
val topicPartition2 = new TopicPartition("foo", 1)
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
group.generationId = 5
// expect the group metadata tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
mockGetPartition()
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
val records = recordsCapture.getValue.records.asScala.toList
recordsCapture.getValue.batches.forEach { batch =>
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
}
assertEquals(1, records.size)
val metadataTombstone = records.head
assertTrue(metadataTombstone.hasKey)
assertFalse(metadataTombstone.hasValue)
assertTrue(metadataTombstone.timestamp > 0)
val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
assertEquals(groupId, groupKey.key)
// the full group should be gone since all offsets were removed
assertEquals(None, groupMetadataManager.getGroup(groupId))
val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
}
@Test
def testGroupMetadataRemovalWithLogAppendTime(): Unit = {
val topicPartition1 = new TopicPartition("foo", 0)
val topicPartition2 = new TopicPartition("foo", 1)
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
group.generationId = 5
// expect the group metadata tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
mockGetPartition()
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
val records = recordsCapture.getValue.records.asScala.toList
recordsCapture.getValue.batches.forEach { batch =>
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
// Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
}
assertEquals(1, records.size)
val metadataTombstone = records.head
assertTrue(metadataTombstone.hasKey)
assertFalse(metadataTombstone.hasValue)
assertTrue(metadataTombstone.timestamp > 0)
val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
assertEquals(groupId, groupKey.key)
// the full group should be gone since all offsets were removed
assertEquals(None, groupMetadataManager.getGroup(groupId))
val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
}
@Test
def testExpireGroupWithOffsetsOnly(): Unit = {
// verify that the group is removed properly, but no tombstone is written if
// this is a group which is only using kafka for offset storage
val memberId = ""
val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
topicIdPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", startMs, Some(startMs + 1)),
topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// expire all of the offsets
time.sleep(4)
// expect the offset tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
// verify the tombstones are correct and only for the expired offsets
val records = recordsCapture.getValue.records.asScala.toList
assertEquals(2, records.size)
records.foreach { message =>
assertTrue(message.hasKey)
assertFalse(message.hasValue)
val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
assertEquals(groupId, offsetKey.key.group)
assertEquals("foo", offsetKey.key.topicPartition.topic)
}
// the full group should be gone since all offsets were removed
assertEquals(None, groupMetadataManager.getGroup(groupId))
val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
)
verify(replicaManager).onlinePartition(groupTopicPartition)
}
@Test
def testOffsetExpirationSemantics(): Unit = {
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
val topic = "foo"
val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, topic)
val topicIdPartition3 = new TopicIdPartition(topicIdPartition1.topicId, 2, topic)
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val subscription = new Subscription(List(topic).asJava)
val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 1)
val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 3)
// new clients, no per-partition expiry timestamp, offsets of group expire together
val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map(
topicIdPartition1 -> tp1OffsetAndMetadata,
topicIdPartition2 -> tp2OffsetAndMetadata,
topicIdPartition3 -> tp3OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// do not expire any offset even though expiration timestamp is reached for one (due to group still being active)
time.sleep(2)
groupMetadataManager.cleanupGroupMetadata()
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicIdPartition1.topicPartition))
assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicIdPartition2.topicPartition))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicIdPartition3.topicPartition))
var cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
)
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
group.transitionTo(PreparingRebalance)
group.transitionTo(Empty)
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
// group is empty now, only one offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicIdPartition2.topicPartition))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicIdPartition3.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
)
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
time.sleep(2)
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
assertEquals(None, group.offset(topicIdPartition2.topicPartition))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicIdPartition3.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
)
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
// advance time to just before the offset of last partition is to be expired, no offset should expire
time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
assertEquals(None, group.offset(topicIdPartition2.topicPartition))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicIdPartition3.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
)
assertEquals(
Some(offset),
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
)
verify(replicaManager, times(4)).onlinePartition(groupTopicPartition)
// advance time enough for that last offset to expire
time.sleep(2)
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
assertEquals(None, group.offset(topicIdPartition2.topicPartition))
assertEquals(None, group.offset(topicIdPartition3.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition, topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
)
verify(replicaManager, times(5)).onlinePartition(groupTopicPartition)
assert(group.is(Dead))
}
@Test
def testOffsetExpirationOfSimpleConsumer(): Unit = {
val memberId = "memberId"
val topic = "foo"
val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 and 3 milliseconds (old clients) and after default retention (new clients)
val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
// new clients, no per-partition expiry timestamp, offsets of group expire together
val offsets = immutable.Map(
topicIdPartition1 -> tp1OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// do not expire offsets while within retention period since commit timestamp
val expiryTimestamp = offsets(topicIdPartition1).commitTimestamp + defaultOffsetRetentionMs
time.sleep(expiryTimestamp - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicIdPartition1.topicPartition))
var cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition))
)
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
// advance time to enough for offsets to expire
time.sleep(2)
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicIdPartition1.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(topicIdPartition1.topicPartition))
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
assert(group.is(Dead))
}
@Test
def testOffsetExpirationOfActiveGroupSemantics(): Unit = {
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
val topic1 = "foo"
val topic1IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic1)
val topic1IdPartition1 = new TopicIdPartition(topic1IdPartition0.topicId, 1, topic1)
val topic2 = "bar"
val topic2IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic2)
val topic2IdPartition1 = new TopicIdPartition(topic2IdPartition0.topicId, 1, topic2)
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// Subscribe to topic1 and topic2
val subscriptionTopic1AndTopic2 = new Subscription(List(topic1, topic2).asJava)
val member = new MemberMetadata(
memberId,
Some(groupInstanceId),
clientId,
clientHost,
rebalanceTimeout,
sessionTimeout,
ConsumerProtocol.PROTOCOL_TYPE,
List(("protocol", ConsumerProtocol.serializeSubscription(subscriptionTopic1AndTopic2).array()))
)
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
group.transitionTo(Stable)
val startMs = time.milliseconds
val t1p0OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val t1p1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val t2p0OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val t2p1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map(
topic1IdPartition0 -> t1p0OffsetAndMetadata,
topic1IdPartition1 -> t1p1OffsetAndMetadata,
topic2IdPartition0 -> t2p0OffsetAndMetadata,
topic2IdPartition1 -> t2p1OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topic1IdPartition0))
// advance time to just after the offset of last partition is to be expired
time.sleep(defaultOffsetRetentionMs + 2)
// no offset should expire because all topics are actively consumed
groupMetadataManager.cleanupGroupMetadata()
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assert(group.is(Stable))
assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1IdPartition0.topicPartition))
assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1IdPartition1.topicPartition))
assertEquals(Some(t2p0OffsetAndMetadata), group.offset(topic2IdPartition0.topicPartition))
assertEquals(Some(t2p1OffsetAndMetadata), group.offset(topic2IdPartition1.topicPartition))
var cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(
topic1IdPartition0.topicPartition,
topic1IdPartition1.topicPartition,
topic2IdPartition0.topicPartition,
topic2IdPartition1.topicPartition)
)
)
assertEquals(
Some(offset),
cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset)
)
assertEquals(
Some(offset),
cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset)
)
assertEquals(
Some(offset),
cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
)
assertEquals(
Some(offset),
cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
)
verify(replicaManager).onlinePartition(groupTopicPartition)
group.transitionTo(PreparingRebalance)
// Subscribe to topic1, offsets of topic2 should be removed
val subscriptionTopic1 = new Subscription(List(topic1).asJava)
group.updateMember(
member,
List(("protocol", ConsumerProtocol.serializeSubscription(subscriptionTopic1).array())),
member.rebalanceTimeoutMs,
member.sessionTimeoutMs,
null
)
group.initNextGeneration()
group.transitionTo(Stable)
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
groupMetadataManager.cleanupGroupMetadata()
verify(partition).appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any())
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assert(group.is(Stable))
assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1IdPartition0.topicPartition))
assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1IdPartition1.topicPartition))
assertEquals(None, group.offset(topic2IdPartition0.topicPartition))
assertEquals(None, group.offset(topic2IdPartition1.topicPartition))
cachedOffsets = groupMetadataManager.getOffsets(
groupId,
defaultRequireStable,
Some(Seq(
topic1IdPartition0.topicPartition,
topic1IdPartition1.topicPartition,
topic2IdPartition0.topicPartition,
topic2IdPartition1.topicPartition)
)
)
assertEquals(Some(offset), cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset))
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
)
assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
)
}
@Test
def testLoadOffsetFromOldCommit(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val metadataVersion = IBP_1_1_IV0
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, metadataVersion = metadataVersion, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
}
}
@Test
def testLoadOffsetWithExplicitRetention(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
}
}
@Test
def testSerdeOffsetCommitValue(): Unit = {
val offsetAndMetadata = OffsetAndMetadata(
offset = 537L,
leaderEpoch = Optional.of(15),
metadata = "metadata",
commitTimestamp = time.milliseconds(),
expireTimestamp = None)
def verifySerde(metadataVersion: MetadataVersion, expectedOffsetCommitValueVersion: Int): Unit = {
val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, metadataVersion)
val buffer = ByteBuffer.wrap(bytes)
assertEquals(expectedOffsetCommitValueVersion, buffer.getShort(0).toInt)
val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(offsetAndMetadata.offset, deserializedOffsetAndMetadata.offset)
assertEquals(offsetAndMetadata.metadata, deserializedOffsetAndMetadata.metadata)
assertEquals(offsetAndMetadata.commitTimestamp, deserializedOffsetAndMetadata.commitTimestamp)
// Serialization drops the leader epoch silently if an older inter-broker protocol is in use
val expectedLeaderEpoch = if (expectedOffsetCommitValueVersion >= 3)
offsetAndMetadata.leaderEpoch
else
Optional.empty()
assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
}
for (version <- MetadataVersion.VERSIONS) {
val expectedSchemaVersion = version match {
case v if v.isLessThan(IBP_2_1_IV0) => 1
case v if v.isLessThan(IBP_2_1_IV1) => 2
case _ => 3
}
verifySerde(version, expectedSchemaVersion)
}
}
@Test
def testSerdeOffsetCommitValueWithExpireTimestamp(): Unit = {
// If expire timestamp is set, we should always use version 1 of the offset commit
// value schema since later versions do not support it
val offsetAndMetadata = OffsetAndMetadata(
offset = 537L,
leaderEpoch = Optional.empty(),
metadata = "metadata",
commitTimestamp = time.milliseconds(),
expireTimestamp = Some(time.milliseconds() + 1000))
def verifySerde(metadataVersion: MetadataVersion): Unit = {
val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, metadataVersion)
val buffer = ByteBuffer.wrap(bytes)
assertEquals(1, buffer.getShort(0).toInt)
val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}
@Test
def testSerdeOffsetCommitValueWithNoneExpireTimestamp(): Unit = {
val offsetAndMetadata = OffsetAndMetadata(
offset = 537L,
leaderEpoch = Optional.empty(),
metadata = "metadata",
commitTimestamp = time.milliseconds(),
expireTimestamp = None)
def verifySerde(metadataVersion: MetadataVersion): Unit = {
val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, metadataVersion)
val buffer = ByteBuffer.wrap(bytes)
val version = buffer.getShort(0).toInt
if (metadataVersion.isLessThan(IBP_2_1_IV0))
assertEquals(1, version)
else if (metadataVersion.isLessThan(IBP_2_1_IV1))
assertEquals(2, version)
else
assertEquals(3, version)
val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
for (version <- MetadataVersion.VERSIONS)
verifySerde(version)
}
@Test
def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val memberId = "98098230493"
val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
))
val record = TestUtils.records(Seq(
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
)).records.asScala.head
assertEquals(3, record.value.getShort)
}
@Test
def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
offsetCommitRecords.foreach { record =>
assertEquals(3, record.value.getShort)
}
}
@Test
def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = {
val member = new GroupMetadataValue.MemberMetadata()
.setMemberId("member")
.setClientId("client")
.setClientHost("host")
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val leader = "leader"
val groupMetadataValue = new GroupMetadataValue()
.setProtocolType(protocolType)
.setGeneration(generation)
.setProtocol(protocol)
.setLeader(leader)
.setMembers(java.util.Collections.singletonList(member))
val deserialized = GroupMetadataManager.readGroupMessageValue("groupId",
MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time)
assertEquals(generation, deserialized.generationId)
assertEquals(protocolType, deserialized.protocolType.get)
assertEquals(protocol, deserialized.protocolName.get)
assertEquals(leader, deserialized.leaderOrNull)
val actualMember = deserialized.allMemberMetadata.head
assertEquals(member.memberId, actualMember.memberId)
assertEquals(member.clientId, actualMember.clientId)
assertEquals(member.clientHost, actualMember.clientHost)
}
@Test
def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = {
val offsetCommitValue = new OffsetCommitValue()
.setOffset(1000L)
.setMetadata("metadata")
.setCommitTimestamp(1500L)
.setLeaderEpoch(1)
val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue)
val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized)
assertEquals(1000L, deserialized.offset)
assertEquals("metadata", deserialized.metadata)
assertEquals(1500L, deserialized.commitTimestamp)
assertEquals(1, deserialized.leaderEpoch.get)
}
@Test
def testDeserializeFutureOffsetCommitValue(): Unit = {
// Copy of OffsetCommitValue.SCHEMA_4 with a few
// additional tagged fields.
val futureOffsetCommitSchema = new Schema(
new Field("offset", Type.INT64, ""),
new Field("leader_epoch", Type.INT32, ""),
new Field("metadata", Type.COMPACT_STRING, ""),
new Field("commit_timestamp", Type.INT64, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("offset_foo", Type.STRING, ""),
Int.box(1), new Field("offset_bar", Type.INT32, "")
)
)
// Create OffsetCommitValue with tagged fields
val offsetCommit = new Struct(futureOffsetCommitSchema)
offsetCommit.set("offset", 1000L)
offsetCommit.set("leader_epoch", 100)
offsetCommit.set("metadata", "metadata")
offsetCommit.set("commit_timestamp", 2000L)
val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
offsetCommitTaggedFields.put(0, "foo")
offsetCommitTaggedFields.put(1, 4000)
offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)
// Prepare the buffer.
val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
offsetCommit.writeTo(buffer)
buffer.flip()
// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
// Read the buffer with readOffsetMessageValue.
buffer.rewind()
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(1000L, offsetAndMetadata.offset)
assertEquals(100, offsetAndMetadata.leaderEpoch.get)
assertEquals("metadata", offsetAndMetadata.metadata)
assertEquals(2000L, offsetAndMetadata.commitTimestamp)
}
@Test
def testDeserializeFutureGroupMetadataValue(): Unit = {
// Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few
// additional tagged fields.
val futureMemberSchema = new Schema(
new Field("member_id", Type.COMPACT_STRING, ""),
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
new Field("client_id", Type.COMPACT_STRING, ""),
new Field("client_host", Type.COMPACT_STRING, ""),
new Field("rebalance_timeout", Type.INT32, ""),
new Field("session_timeout", Type.INT32, ""),
new Field("subscription", Type.COMPACT_BYTES, ""),
new Field("assignment", Type.COMPACT_BYTES, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("member_foo", Type.STRING, ""),
Int.box(1), new Field("member_foo", Type.INT32, "")
)
)
// Copy of GroupMetadataValue.SCHEMA_4 with a few
// additional tagged fields.
val futureGroupSchema = new Schema(
new Field("protocol_type", Type.COMPACT_STRING, ""),
new Field("generation", Type.INT32, ""),
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
new Field("current_state_timestamp", Type.INT64, ""),
new Field("members", new CompactArrayOf(futureMemberSchema), ""),
TaggedFieldsSection.of(
Int.box(0), new Field("group_foo", Type.STRING, ""),
Int.box(1), new Field("group_bar", Type.INT32, "")
)
)
// Create a member with tagged fields.
val member = new Struct(futureMemberSchema)
member.set("member_id", "member_id")
member.set("group_instance_id", "group_instance_id")
member.set("client_id", "client_id")
member.set("client_host", "client_host")
member.set("rebalance_timeout", 1)
member.set("session_timeout", 2)
member.set("subscription", ByteBuffer.allocate(0))
member.set("assignment", ByteBuffer.allocate(0))
val memberTaggedFields = new java.util.TreeMap[Integer, Any]()
memberTaggedFields.put(0, "foo")
memberTaggedFields.put(1, 4000)
member.set("_tagged_fields", memberTaggedFields)
// Create a group with tagged fields.
val group = new Struct(futureGroupSchema)
group.set("protocol_type", "consumer")
group.set("generation", 10)
group.set("protocol", "range")
group.set("leader", "leader")
group.set("current_state_timestamp", 1000L)
group.set("members", Array(member))
val groupTaggedFields = new java.util.TreeMap[Integer, Any]()
groupTaggedFields.put(0, "foo")
groupTaggedFields.put(1, 4000)
group.set("_tagged_fields", groupTaggedFields)
// Prepare the buffer.
val buffer = ByteBuffer.allocate(group.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
group.writeTo(buffer)
buffer.flip()
// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag))
// Read the buffer with readGroupMessageValue.
buffer.rewind()
val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time)
assertEquals("consumer", groupMetadata.protocolType.get)
assertEquals("leader", groupMetadata.leaderOrNull)
assertTrue(groupMetadata.allMembers.contains("member_id"))
}
@Test
def testLoadOffsetsWithEmptyControlBatch(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val generation = 15
val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
// Prepend empty control batch to valid records
val mockBatch: MutableRecordBatch = mock(classOf[MutableRecordBatch])
when(mockBatch.iterator).thenReturn(Collections.emptyIterator[Record])
when(mockBatch.isControlBatch).thenReturn(true)
when(mockBatch.isTransactional).thenReturn(true)
when(mockBatch.nextOffset).thenReturn(16L)
val mockRecords: MemoryRecords = mock(classOf[MemoryRecords])
when(mockRecords.batches).thenReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava)
when(mockRecords.records).thenReturn(records.records())
when(mockRecords.sizeInBytes()).thenReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes())
val logMock: UnifiedLog = mock(classOf[UnifiedLog])
when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords))
when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock))
when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18))
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
// Empty control batch should not have caused the load to fail
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.leaderOrNull)
assertNull(group.protocolName.orNull)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testCommittedOffsetParsing(): Unit = {
val groupId = "group"
val topicPartition = new TopicPartition("topic", 0)
val offsetCommitRecord = TestUtils.records(Seq(
new SimpleRecord(
GroupMetadataManager.offsetCommitKey(groupId, topicPartition),
GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()), MetadataVersion.latest)
)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
assertEquals(Some("offset=35"), valueStringOpt)
}
@Test
def testCommittedOffsetTombstoneParsing(): Unit = {
val groupId = "group"
val topicPartition = new TopicPartition("topic", 0)
val offsetCommitRecord = TestUtils.records(Seq(
new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, topicPartition), null)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}
@Test
def testGroupMetadataParsingWithNullUserData(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val memberId = "98098230493"
val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
))
val groupMetadataRecord = TestUtils.records(Seq(
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
assertEquals(Some("{\"protocolType\":\"consumer\",\"protocol\":\"range\"," +
"\"generationId\":935,\"assignment\":\"{98098230493=[topic-0]}\"}"), valueStringOpt)
}
@Test
def testGroupMetadataTombstoneParsing(): Unit = {
val groupId = "group"
val groupMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
)).records.asScala.head
val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}
private def verifyAppendAndCaptureCallback(): ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = {
val capturedArgument: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any[Map[TopicPartition, MemoryRecords]],
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
capturedArgument
}
private def expectAppendMessage(error: Errors): ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = {
val capturedCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
val capturedRecords: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
when(replicaManager.appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
capturedRecords.capture(),
capturedCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
any(),
any())
).thenAnswer(_ => {
capturedCallback.getValue.apply(
Map(groupTopicPartition ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)
)})
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
capturedRecords
}
private def buildStableGroupRecordWithMember(generation: Int,
protocolType: String,
protocol: String,
memberId: String,
assignmentBytes: Array[Byte] = Array.emptyByteArray,
metadataVersion: MetadataVersion = MetadataVersion.latest): SimpleRecord = {
val memberProtocols = List((protocol, Array.emptyByteArray))
val member = new MemberMetadata(memberId, Some(groupInstanceId), "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
if (metadataVersion.isAtLeast(IBP_2_1_IV0)) Some(time.milliseconds()) else None, Seq(member), time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> assignmentBytes), metadataVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, MetadataVersion.latest)
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long,
records: MemoryRecords): Unit = {
val logMock: UnifiedLog = mock(classOf[UnifiedLog])
when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock))
val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some(endOffset))
}
/**
* mock records into a mocked log
*
* @return the calculated end offset to be mocked into [[ReplicaManager.getLogEndOffset]]
*/
private def expectGroupMetadataLoad(logMock: UnifiedLog,
startOffset: Long,
records: MemoryRecords): Long = {
val endOffset = startOffset + records.records.asScala.size
val fileRecordsMock: FileRecords = mock(classOf[FileRecords])
when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
when(fileRecordsMock.readInto(bufferCapture.capture(), anyInt())).thenAnswer(_ => {
val buffer = bufferCapture.getValue
buffer.put(records.buffer.duplicate)
buffer.flip()
})
endOffset
}
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
groupId: String = groupId,
metadataVersion: MetadataVersion = MetadataVersion.latest,
retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) =>
val commitTimestamp = time.milliseconds()
val offsetAndMetadata = retentionTimeOpt match {
case Some(retentionTimeMs) =>
val expirationTime = commitTimestamp + retentionTimeMs
OffsetAndMetadata(offset, "", commitTimestamp, expirationTime)
case None =>
OffsetAndMetadata(offset, "", commitTimestamp)
}
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, metadataVersion)
new SimpleRecord(offsetCommitKey, offsetCommitValue)
}.toSeq
}
private def mockGetPartition(): Unit = {
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
}
private def getGauge(manager: GroupMetadataManager, name: String): Gauge[Int] = {
KafkaYammerMetrics.defaultRegistry().allMetrics().get(manager.metricsGroup.metricName(name, Collections.emptyMap())).asInstanceOf[Gauge[Int]]
}
private def expectMetrics(manager: GroupMetadataManager,
expectedNumGroups: Int,
expectedNumGroupsPreparingRebalance: Int,
expectedNumGroupsCompletingRebalance: Int): Unit = {
assertEquals(expectedNumGroups, getGauge(manager, "NumGroups").value)
assertEquals(expectedNumGroupsPreparingRebalance, getGauge(manager, "NumGroupsPreparingRebalance").value)
assertEquals(expectedNumGroupsCompletingRebalance, getGauge(manager, "NumGroupsCompletingRebalance").value)
}
@Test
def testMetrics(): Unit = {
groupMetadataManager.cleanupGroupMetadata()
expectMetrics(groupMetadataManager, 0, 0, 0)
val group = new GroupMetadata("foo2", Stable, time)
groupMetadataManager.addGroup(group)
expectMetrics(groupMetadataManager, 1, 0, 0)
group.transitionTo(PreparingRebalance)
expectMetrics(groupMetadataManager, 1, 1, 0)
group.transitionTo(CompletingRebalance)
expectMetrics(groupMetadataManager, 1, 0, 1)
}
@Test
def testPartitionLoadMetric(): Unit = {
val server = ManagementFactory.getPlatformMBeanServer
val mBeanName = "kafka.server:type=group-coordinator-metrics"
val reporter = new JmxReporter
val metricsContext = new KafkaMetricsContext("kafka.server")
reporter.contextChange(metricsContext)
metrics.addReporter(reporter)
def partitionLoadTime(attribute: String): Double = {
server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double]
}
assertTrue(server.isRegistered(new ObjectName(mBeanName)))
assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0)
assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0)
assertTrue(reporter.containsMbean(mBeanName))
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val memberId = "98098230493"
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
protocolType = "consumer", protocol = "range", memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
// When passed a specific start offset, assert that the measured values are in excess of that.
val now = time.milliseconds()
val diff = 1000
val groupEpoch = 2
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), now - diff)
assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
}
@Test
def testReadMessageKeyCanReadUnknownMessage(): Unit = {
val record = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)
val key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(unknownRecord))
assertEquals(UnknownKey(Short.MaxValue), key)
}
@Test
def testLoadGroupsAndOffsetsWillIgnoreUnknownMessage(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
// Should ignore unknown record
val unknownKey = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
val lowestUnsupportedVersion = (org.apache.kafka.coordinator.group.generated.GroupMetadataKey
.HIGHEST_SUPPORTED_VERSION + 1).toShort
val unknownMessage1 = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, unknownKey)
val unknownMessage2 = MessageUtil.toVersionPrefixedBytes(lowestUnsupportedVersion, unknownKey)
val unknownRecord1 = new SimpleRecord(unknownMessage1, unknownMessage1)
val unknownRecord2 = new SimpleRecord(unknownMessage2, unknownMessage2)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
(offsetCommitRecords ++ Seq(unknownRecord1, unknownRecord2) ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupTopicPartition, startOffset, records)
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, 1, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
}
}