blob: 4c4b58bf6bf5880d0a89ec0806c0cbca88cf0c99 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
@Disabled
class TransactionsTest extends IntegrationTestHarness {
override def brokerCount = 3
val transactionalProducerCount = 2
val transactionalConsumerCount = 1
val nonTransactionalConsumerCount = 1
val topic1 = "topic1"
val topic2 = "topic2"
val numPartitions = 4
val transactionalProducers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte], Array[Byte]]]()
val nonTransactionalConsumers = mutable.Buffer[Consumer[Array[Byte], Array[Byte]]]()
def overridingProps(): Properties = {
val props = new Properties()
props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
// Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
props.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString)
props.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString)
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
// The new group coordinator does not support verifying transactions yet.
if (isNewGroupCoordinatorEnabled()) {
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
}
props
}
override protected def modifyConfigs(props: Seq[Properties]): Unit = {
props.foreach(p => p.putAll(overridingProps()))
}
override protected def kraftControllerConfigs(): Seq[Properties] = {
Seq(overridingProps())
}
def topicConfig(): Properties = {
val topicConfig = new Properties()
topicConfig.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 2.toString)
topicConfig
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
createTopic(topic1, numPartitions, brokerCount, topicConfig())
createTopic(topic2, numPartitions, brokerCount, topicConfig())
for (_ <- 0 until transactionalProducerCount)
createTransactionalProducer("transactional-producer")
for (_ <- 0 until transactionalConsumerCount)
createReadCommittedConsumer("transactional-group")
for (_ <- 0 until nonTransactionalConsumerCount)
createReadUncommittedConsumer("non-transactional-group")
}
@AfterEach
override def tearDown(): Unit = {
transactionalProducers.foreach(_.close())
transactionalConsumers.foreach(_.close())
nonTransactionalConsumers.foreach(_.close())
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testBasicTransactions(quorum: String): Unit = {
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
val unCommittedConsumer = nonTransactionalConsumers.head
val tp11 = new TopicPartition(topic1, 1)
val tp22 = new TopicPartition(topic2, 2)
producer.initTransactions()
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "2", "2", willBeCommitted = false))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false))
producer.flush()
// Since we haven't committed/aborted any records, the last stable offset is still 0,
// no segments should be offloaded to remote storage
verifyLogStartOffsets(Map((tp11, 0), (tp22, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp11, 0), (tp22, 0)))
producer.abortTransaction()
maybeWaitForAtLeastOneSegmentUpload(Seq(tp11, tp22))
// We've sent 1 record + 1 abort mark = 2 (segments) to each topic partition,
// so 1 segment should be offloaded, the local log start offset should be 1
// And log start offset is still 0
verifyLogStartOffsets(Map((tp11, 0), (tp22, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp11, 1L), (tp22, 1L)))
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "1", "1", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "3", "3", willBeCommitted = true))
// Before records are committed, these records won't be offloaded.
verifyLogStartOffsets(Map((tp11, 0), (tp22, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp11, 1L), (tp22, 1L)))
producer.commitTransaction()
// We've sent 2 records + 1 abort mark + 1 commit mark = 4 (segments) to each topic partition,
// so 3 segments should be offloaded, the local log start offset should be 3
// And log start offset is still 0
verifyLogStartOffsets(Map((tp11, 0), (tp22, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp11, 3L), (tp22, 3L)))
consumer.subscribe(List(topic1, topic2).asJava)
unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
val allRecords = consumeRecords(unCommittedConsumer, 4)
val expectedValues = List("1", "2", "3", "4").toSet
allRecords.foreach { record =>
assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
val readCommittedConsumer = transactionalConsumers.head
val readUncommittedConsumer = nonTransactionalConsumers.head
producer1.initTransactions()
producer2.initTransactions()
producer1.beginTransaction()
producer2.beginTransaction()
val latestVisibleTimestamp = System.currentTimeMillis()
producer2.send(new ProducerRecord(topic1, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
producer2.send(new ProducerRecord(topic2, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
producer2.flush()
val latestWrittenTimestamp = latestVisibleTimestamp + 1
producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "a".getBytes, "1".getBytes))
producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "b".getBytes, "2".getBytes))
producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "c".getBytes, "3".getBytes))
producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "d".getBytes, "4".getBytes))
producer1.flush()
producer2.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
producer2.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
producer2.commitTransaction()
// ensure the records are visible to the read uncommitted consumer
val tp1 = new TopicPartition(topic1, 0)
val tp2 = new TopicPartition(topic2, 0)
readUncommittedConsumer.assign(Set(tp1, tp2).asJava)
consumeRecords(readUncommittedConsumer, 8)
val readUncommittedOffsetsForTimes = readUncommittedConsumer.offsetsForTimes(Map(
tp1 -> (latestWrittenTimestamp: JLong),
tp2 -> (latestWrittenTimestamp: JLong)
).asJava)
assertEquals(2, readUncommittedOffsetsForTimes.size)
assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp1).timestamp)
assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp2).timestamp)
readUncommittedConsumer.unsubscribe()
// we should only see the first two records which come before the undecided second transaction
readCommittedConsumer.assign(Set(tp1, tp2).asJava)
val records = consumeRecords(readCommittedConsumer, 2)
records.foreach { record =>
assertEquals("x", new String(record.key))
assertEquals("1", new String(record.value))
}
// even if we seek to the end, we should not be able to see the undecided data
assertEquals(2, readCommittedConsumer.assignment.size)
readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
readCommittedConsumer.assignment.forEach { tp =>
assertEquals(1L, readCommittedConsumer.position(tp))
}
// undecided timestamps should not be searchable either
val readCommittedOffsetsForTimes = readCommittedConsumer.offsetsForTimes(Map(
tp1 -> (latestWrittenTimestamp: JLong),
tp2 -> (latestWrittenTimestamp: JLong)
).asJava)
assertNull(readCommittedOffsetsForTimes.get(tp1))
assertNull(readCommittedOffsetsForTimes.get(tp2))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
val tp10 = new TopicPartition(topic1, 0)
producer1.initTransactions()
producer2.initTransactions()
producer1.beginTransaction()
producer2.beginTransaction()
producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes))
producer2.flush()
producer1.send(new ProducerRecord(topic1, 0, "y".getBytes, "1".getBytes))
producer1.send(new ProducerRecord(topic1, 0, "y".getBytes, "2".getBytes))
producer1.flush()
producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes))
producer2.flush()
// Since we haven't committed/aborted any records, the last stable offset is still 0,
// no segments should be offloaded to remote storage
verifyLogStartOffsets(Map((tp10, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp10, 0)))
producer1.abortTransaction()
producer2.commitTransaction()
maybeWaitForAtLeastOneSegmentUpload(Seq(tp10))
// We've sent 4 records + 1 abort mark + 1 commit mark = 6 (segments),
// so 5 segments should be offloaded, the local log start offset should be 5
// And log start offset is still 0
verifyLogStartOffsets(Map((tp10, 0)))
maybeVerifyLocalLogStartOffsets(Map((tp10, 5)))
// ensure that the consumer's fetch will sit in purgatory
val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100")
val readCommittedConsumer = createReadCommittedConsumer(props = consumerProps)
readCommittedConsumer.assign(Set(tp10).asJava)
val records = consumeRecords(readCommittedConsumer, numRecords = 2)
assertEquals(2, records.size)
val first = records.head
assertEquals("x", new String(first.key))
assertEquals("1", new String(first.value))
assertEquals(0L, first.offset)
val second = records.last
assertEquals("x", new String(second.key))
assertEquals("2", new String(second.value))
assertEquals(3L, second.offset)
}
@nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsWithGroupId(quorum: String): Unit = {
sendOffset((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsWithGroupMetadata(quorum: String): Unit = {
sendOffset((producer, _, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumer.groupMetadata()))
}
private def sendOffset(commit: (KafkaProducer[Array[Byte], Array[Byte]],
String, Consumer[Array[Byte], Array[Byte]]) => Unit): Unit = {
// The basic plan for the test is as follows:
// 1. Seed topic1 with 500 unique, numbered, messages.
// 2. Run a consume/process/produce loop to transactionally copy messages from topic1 to topic2 and commit
// offsets as part of the transaction.
// 3. Randomly abort transactions in step2.
// 4. Validate that we have 500 unique committed messages in topic2. If the offsets were committed properly with the
// transactions, we should not have any duplicates or missing messages since we should process in the input
// messages exactly once.
val consumerGroupId = "foobar-consumer-group"
val numSeedMessages = 500
TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, brokers)
val producer = transactionalProducers.head
val consumer = createReadCommittedConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
consumer.subscribe(List(topic1).asJava)
producer.initTransactions()
var shouldCommit = false
var recordsProcessed = 0
try {
while (recordsProcessed < numSeedMessages) {
val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed))
producer.beginTransaction()
shouldCommit = !shouldCommit
records.foreach { record =>
val key = new String(record.key(), StandardCharsets.UTF_8)
val value = new String(record.value(), StandardCharsets.UTF_8)
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, key, value, willBeCommitted = shouldCommit))
}
commit(producer, consumerGroupId, consumer)
if (shouldCommit) {
producer.commitTransaction()
recordsProcessed += records.size
debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " +
s"records written to $topic2: $recordsProcessed")
} else {
producer.abortTransaction()
debug(s"aborted transaction Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " +
s"records written to $topic2: $recordsProcessed")
TestUtils.resetToCommittedPositions(consumer)
}
}
} finally {
consumer.close()
}
val partitions = ListBuffer.empty[TopicPartition]
for (partition <- 0 until numPartitions) {
partitions += new TopicPartition(topic2, partition)
}
maybeWaitForAtLeastOneSegmentUpload(partitions.toSeq)
// In spite of random aborts, we should still have exactly 500 messages in topic2. I.e. we should not
// re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
val verifyingConsumer = transactionalConsumers(0)
verifyingConsumer.subscribe(List(topic2).asJava)
val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record =>
TestUtils.assertCommittedAndGetValue(record).toInt
}
val valueSet = valueSeq.toSet
assertEquals(numSeedMessages, valueSeq.size, s"Expected $numSeedMessages values in $topic2.")
assertEquals(valueSeq.size, valueSet.size, s"Expected ${valueSeq.size} unique messages in $topic2.")
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnCommit(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
val consumer = transactionalConsumers(0)
consumer.subscribe(List(topic1, topic2).asJava)
producer1.initTransactions()
producer1.beginTransaction()
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false))
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = false))
producer2.initTransactions() // ok, will abort the open transaction.
producer2.beginTransaction()
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "4", willBeCommitted = true))
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true))
assertThrows(classOf[ProducerFencedException], () => producer1.commitTransaction())
producer2.commitTransaction() // ok
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnSendOffsets(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
val consumer = transactionalConsumers(0)
consumer.subscribe(List(topic1, topic2).asJava)
producer1.initTransactions()
producer1.beginTransaction()
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false))
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = false))
producer1.flush()
producer2.initTransactions() // ok, will abort the open transaction.
producer2.beginTransaction()
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "4", willBeCommitted = true))
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true))
assertThrows(classOf[ProducerFencedException], () => producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0)
-> new OffsetAndMetadata(110L)).asJava, new ConsumerGroupMetadata("foobarGroup")))
producer2.commitTransaction() // ok
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = {
val tp = new TopicPartition(topic1, 0)
val groupId = "group"
val producer = transactionalProducers.head
val consumer = createReadCommittedConsumer(groupId)
consumer.subscribe(List(topic1).asJava)
producer.initTransactions()
producer.beginTransaction()
val offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(15), "some metadata")
producer.sendOffsetsToTransaction(Map(tp -> offsetAndMetadata).asJava, new ConsumerGroupMetadata(groupId))
producer.commitTransaction() // ok
// The call to commit the transaction may return before all markers are visible, so we initialize a second
// producer to ensure the transaction completes and the committed offsets are visible.
val producer2 = transactionalProducers(1)
producer2.initTransactions()
TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset")
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testInitTransactionsTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = false, producer => producer.initTransactions())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsToTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.sendOffsetsToTransaction(
Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, new ConsumerGroupMetadata("test-group")))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testCommitTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.commitTransaction())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testAbortTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.abortTransaction())
}
private def testTimeout(needInitAndSendMsg: Boolean,
timeoutProcess: KafkaProducer[Array[Byte], Array[Byte]] => Unit): Unit = {
val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 3000)
if (needInitAndSendMsg) {
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes))
}
for (i <- brokers.indices) killBroker(i)
assertThrows(classOf[TimeoutException], () => timeoutProcess(producer))
producer.close(Duration.ZERO)
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnSend(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
val consumer = transactionalConsumers(0)
consumer.subscribe(List(topic1, topic2).asJava)
producer1.initTransactions()
producer1.beginTransaction()
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false))
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = false))
producer2.initTransactions() // ok, will abort the open transaction.
producer2.beginTransaction()
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "4", willBeCommitted = true)).get()
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true)).get()
try {
val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
brokers.foreach { broker =>
error(s"log dirs: ${broker.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")
} catch {
case _: ProducerFencedException =>
producer1.close()
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
case e: Exception =>
throw new AssertionError("Got an unexpected exception from a fenced producer.", e)
}
producer2.commitTransaction() // ok
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnAddPartitions(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
val consumer = transactionalConsumers(0)
consumer.subscribe(List(topic1, topic2).asJava)
producer1.initTransactions()
producer1.beginTransaction()
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false))
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = false))
producer1.abortTransaction()
producer2.initTransactions() // ok, will abort the open transaction.
producer2.beginTransaction()
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "4", willBeCommitted = true))
.get(20, TimeUnit.SECONDS)
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true))
.get(20, TimeUnit.SECONDS)
try {
producer1.beginTransaction()
val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
brokers.foreach { broker =>
error(s"log dirs: ${broker.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")
} catch {
case _: ProducerFencedException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
case e: Exception =>
throw new AssertionError("Got an unexpected exception from a fenced producer.", e)
}
producer2.commitTransaction() // ok
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnTransactionExpiration(quorum: String): Unit = {
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
producer.initTransactions()
producer.beginTransaction()
// The first message and hence the first AddPartitions request should be successfully sent.
val firstMessageResult = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = false)).get()
assertTrue(firstMessageResult.hasOffset)
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
try {
// Now that the transaction has expired, the second send should fail with a ProducerFencedException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a ProducerFencedException since the transaction has expired")
} catch {
case _: ProducerFencedException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
}
// Verify that the first message was aborted and the second one was never written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head
nonTransactionalConsumer.subscribe(List(topic1).asJava)
// Attempt to consume the one written record. We should not see the second. The
// assertion does not strictly guarantee that the record wasn't written, but the
// data is small enough that had it been written, it would have been in the first fetch.
val records = TestUtils.consumeRecords(nonTransactionalConsumer, numRecords = 1)
assertEquals(1, records.size)
assertEquals("1", TestUtils.recordValueAsString(records.head))
val transactionalConsumer = transactionalConsumers.head
transactionalConsumer.subscribe(List(topic1).asJava)
val transactionalRecords = consumeRecordsFor(transactionalConsumer)
assertTrue(transactionalRecords.isEmpty)
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testMultipleMarkersOneLeader(quorum: String): Unit = {
val firstProducer = transactionalProducers.head
val consumer = transactionalConsumers.head
val unCommittedConsumer = nonTransactionalConsumers.head
val topicWith10Partitions = "largeTopic"
val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica"
createTopic(topicWith10Partitions, 10, brokerCount, topicConfig())
createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
firstProducer.initTransactions()
firstProducer.beginTransaction()
sendTransactionalMessagesWithValueRange(firstProducer, topicWith10Partitions, 0, 5000, willBeCommitted = false)
sendTransactionalMessagesWithValueRange(firstProducer, topicWith10PartitionsAndOneReplica, 5000, 10000, willBeCommitted = false)
firstProducer.abortTransaction()
firstProducer.beginTransaction()
sendTransactionalMessagesWithValueRange(firstProducer, topicWith10Partitions, 10000, 11000, willBeCommitted = true)
firstProducer.commitTransaction()
consumer.subscribe(List(topicWith10PartitionsAndOneReplica, topicWith10Partitions).asJava)
unCommittedConsumer.subscribe(List(topicWith10PartitionsAndOneReplica, topicWith10Partitions).asJava)
val records = consumeRecords(consumer, 1000)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
val allRecords = consumeRecords(unCommittedConsumer, 11000)
val expectedValues = Range(0, 11000).map(_.toString).toSet
allRecords.foreach { record =>
assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testConsecutivelyRunInitTransactions(quorum: String): Unit = {
val producer = createTransactionalProducer(transactionalId = "normalProducer")
producer.initTransactions()
assertThrows(classOf[IllegalStateException], () => producer.initTransactions())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testBumpTransactionalEpoch(quorum: String): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
val consumer = transactionalConsumers.head
try {
// Create a topic with RF=1 so that a single broker failure will render it unavailable
val testTopic = "test-topic"
createTopic(testTopic, numPartitions, 1, new Properties)
val partitionLeader = TestUtils.waitUntilLeaderIsKnown(brokers, new TopicPartition(testTopic, 0))
producer.initTransactions()
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "4", "4", willBeCommitted = true))
producer.commitTransaction()
val activeProducersIter = brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get
.producerStateManager.activeProducers.entrySet().iterator()
assertTrue(activeProducersIter.hasNext)
var producerStateEntry = activeProducersIter.next().getValue
val producerId = producerStateEntry.producerId
val initialProducerEpoch = producerStateEntry.producerEpoch
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false))
killBroker(partitionLeader) // kill the partition leader to prevent the batch from being submitted
val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false))
Thread.sleep(6000) // Wait for the record to time out
restartDeadBrokers()
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException])
producer.abortTransaction()
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "4", "4", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "1", "1", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = true))
producer.commitTransaction()
consumer.subscribe(List(topic1, topic2, testTopic).asJava)
val records = consumeRecords(consumer, 5)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
// Producers can safely abort and continue after the last record of a transaction timing out, so it's possible to
// get here without having bumped the epoch. If bumping the epoch is possible, the producer will attempt to, so
// check there that the epoch has actually increased
producerStateEntry =
brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId)
assertNotNull(producerStateEntry)
assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch)
} finally {
producer.close(Duration.ZERO)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFailureToFenceEpoch(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("transactional-producer", maxBlockMs = 1000)
producer1.initTransactions()
producer1.beginTransaction()
producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
producer1.commitTransaction()
val partitionLeader = TestUtils.waitUntilLeaderIsKnown(brokers, new TopicPartition(topic1, 0))
val activeProducersIter = brokers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 0)).get.producerStateManager
.activeProducers.entrySet().iterator()
assertTrue(activeProducersIter.hasNext)
var producerStateEntry = activeProducersIter.next().getValue
val producerId = producerStateEntry.producerId
val initialProducerEpoch = producerStateEntry.producerEpoch
// Kill two brokers to bring the transaction log under min-ISR
killBroker(0)
killBroker(1)
try {
producer2.initTransactions()
} catch {
case _: TimeoutException =>
// good!
case e: Exception =>
throw new AssertionError("Got an unexpected exception from initTransactions", e)
} finally {
producer2.close()
}
restartDeadBrokers()
// Because the epoch was bumped in memory, attempting to begin a transaction with producer 1 should fail
try {
producer1.beginTransaction()
} catch {
case _: ProducerFencedException =>
// good!
case e: Exception =>
throw new AssertionError("Got an unexpected exception from commitTransaction", e)
} finally {
producer1.close()
}
val producer3 = createTransactionalProducer("transactional-producer", maxBlockMs = 5000)
producer3.initTransactions()
producer3.beginTransaction()
producer3.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
producer3.commitTransaction()
// Check that the epoch only increased by 1
producerStateEntry =
brokers(partitionLeader).logManager.getLog(new TopicPartition(topic1, 0)).get.producerStateManager.activeProducers.get(producerId)
assertNotNull(producerStateEntry)
assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch)
}
private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
for (i <- start until end) {
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic, null, value = i.toString, willBeCommitted = willBeCommitted, key = i.toString))
}
producer.flush()
}
private def createReadCommittedConsumer(group: String = "group",
maxPollRecords: Int = 500,
props: Properties = new Properties) = {
val consumer = TestUtils.createConsumer(bootstrapServers(),
groupId = group,
enableAutoCommit = false,
readCommitted = true,
maxPollRecords = maxPollRecords)
transactionalConsumers += consumer
consumer
}
private def createReadUncommittedConsumer(group: String) = {
val consumer = TestUtils.createConsumer(bootstrapServers(),
groupId = group,
enableAutoCommit = false)
nonTransactionalConsumers += consumer
consumer
}
private def createTransactionalProducer(transactionalId: String,
transactionTimeoutMs: Long = 60000,
maxBlockMs: Long = 60000,
deliveryTimeoutMs: Int = 120000,
requestTimeoutMs: Int = 30000): KafkaProducer[Array[Byte], Array[Byte]] = {
val producer = TestUtils.createTransactionalProducer(
transactionalId,
brokers,
transactionTimeoutMs = transactionTimeoutMs,
maxBlockMs = maxBlockMs,
deliveryTimeoutMs = deliveryTimeoutMs,
requestTimeoutMs = requestTimeoutMs
)
transactionalProducers += producer
producer
}
def maybeWaitForAtLeastOneSegmentUpload(topicPartitions: Seq[TopicPartition]): Unit = {
}
def verifyLogStartOffsets(partitionStartOffsets: Map[TopicPartition, Int]): Unit = {
val offsets = new util.HashMap[Integer, JLong]()
waitUntilTrue(() => {
brokers.forall(broker => {
partitionStartOffsets.forall {
case (partition, offset) => {
val lso = broker.replicaManager.localLog(partition).get.logStartOffset
offsets.put(broker.config.brokerId, lso)
offset == lso
}
}
})
}, s"log start offset doesn't change to the expected position: $partitionStartOffsets, current position: $offsets")
}
/**
* Will consume all the records for the given consumer for the specified duration. If you want to drain all the
* remaining messages in the partitions the consumer is subscribed to, the duration should be set high enough so
* that the consumer has enough time to poll everything. This would be based on the number of expected messages left
* in the topic, and should not be too large (ie. more than a second) in our tests.
*
* @return All the records consumed by the consumer within the specified duration.
*/
private def consumeRecordsFor[K, V](consumer: Consumer[K, V]): Seq[ConsumerRecord[K, V]] = {
val duration = 1000
val startTime = System.currentTimeMillis()
val records = new ArrayBuffer[ConsumerRecord[K, V]]()
waitUntilTrue(() => {
records ++= consumer.poll(Duration.ofMillis(50)).asScala
System.currentTimeMillis() - startTime > duration
}, s"The timeout $duration was greater than the maximum wait time.")
records
}
@throws(classOf[InterruptedException])
def maybeVerifyLocalLogStartOffsets(partitionStartOffsets: Map[TopicPartition, JLong]): Unit = {
// Non-tiered storage topic partition doesn't have local log start offset
}
}