KAFKA-14505; [3/N] Wire WriteTxnMarkers API (#14985)
This patch wires the handling of makers written by the transaction coordinator via the WriteTxnMarkers API. In the old group coordinator, the markers are written to the logs and the group coordinator is informed to materialize the changes as a second step if the writes were successful. This approach does not really work with the new group coordinator for mainly two reasons: 1) The second step would actually fail while the coordinator is loading and there is no guarantee that the loading has picked up the write or not; 2) It does not fit well with the new memory model where the state is snapshotted by offset. In both cases, it seems that having a single writer to the `__consumer_offsets` partitions is more robust and preferable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index c583e69..7666bf9 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -20,7 +20,8 @@
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
+import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, LoadSummary, UnknownRecordTypeException}
import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
@@ -135,7 +136,22 @@
memoryRecords.batches.forEach { batch =>
if (batch.isControlBatch) {
- throw new IllegalStateException("Control batches are not supported yet.")
+ batch.asScala.foreach { record =>
+ val controlRecord = ControlRecordType.parse(record.key)
+ if (controlRecord == ControlRecordType.COMMIT) {
+ coordinator.replayEndTransactionMarker(
+ batch.producerId,
+ batch.producerEpoch,
+ TransactionResult.COMMIT
+ )
+ } else if (controlRecord == ControlRecordType.ABORT) {
+ coordinator.replayEndTransactionMarker(
+ batch.producerId,
+ batch.producerEpoch,
+ TransactionResult.ABORT
+ )
+ }
+ }
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 2c12a30..c8c8625 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -21,9 +21,10 @@
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, TimestampType}
+import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, TimestampType}
import org.apache.kafka.common.record.Record.EMPTY_HEADERS
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.AppendOrigin
@@ -160,28 +161,7 @@
s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.")
}
- var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
- replicaManager.appendRecords(
- timeout = 0L,
- requiredAcks = 1,
- internalTopicsAllowed = true,
- origin = AppendOrigin.COORDINATOR,
- entriesPerPartition = Map(tp -> recordsBuilder.build()),
- responseCallback = results => appendResults = results,
- // We can directly complete the purgatories here because we don't hold
- // any conflicting locks.
- actionQueue = directActionQueue
- )
-
- val partitionResult = appendResults.getOrElse(tp,
- throw new IllegalStateException(s"Append status $appendResults should have partition $tp."))
-
- if (partitionResult.error != Errors.NONE) {
- throw partitionResult.error.exception()
- }
-
- // Required offset.
- partitionResult.lastOffset + 1
+ internalAppend(tp, recordsBuilder.build())
} finally {
bufferSupplier.release(buffer)
}
@@ -190,4 +170,62 @@
throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
}
}
+
+ /**
+ * Write the transaction end marker.
+ *
+ * @param tp The partition to write records to.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param coordinatorEpoch The epoch of the transaction coordinator.
+ * @param result The transaction result.
+ * @return The log end offset right after the written records.
+ * @throws KafkaException Any KafkaException caught during the write operation.
+ */
+ override def appendEndTransactionMarker(
+ tp: TopicPartition,
+ producerId: Long,
+ producerEpoch: Short,
+ coordinatorEpoch: Int,
+ result: TransactionResult
+ ): Long = {
+ val controlRecordType = result match {
+ case TransactionResult.COMMIT => ControlRecordType.COMMIT
+ case TransactionResult.ABORT => ControlRecordType.ABORT
+ }
+
+ internalAppend(tp, MemoryRecords.withEndTransactionMarker(
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(controlRecordType, coordinatorEpoch)
+ ))
+ }
+
+ private def internalAppend(
+ tp: TopicPartition,
+ memoryRecords: MemoryRecords
+ ): Long = {
+ var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
+ replicaManager.appendRecords(
+ timeout = 0L,
+ requiredAcks = 1,
+ internalTopicsAllowed = true,
+ origin = AppendOrigin.COORDINATOR,
+ entriesPerPartition = Map(tp -> memoryRecords),
+ responseCallback = results => appendResults = results,
+ // We can directly complete the purgatories here because we don't hold
+ // any conflicting locks.
+ actionQueue = directActionQueue
+ )
+
+ val partitionResult = appendResults.getOrElse(tp,
+ throw new IllegalStateException(s"Append status $appendResults should have partition $tp."))
+
+ if (partitionResult.error != Errors.NONE) {
+ throw partitionResult.error.exception()
+ }
+
+ // Required offset.
+ partitionResult.lastOffset + 1
+ }
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 678844c..67cada0 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -29,6 +29,7 @@
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.util.FutureUtils
+import java.time.Duration
import java.util
import java.util.{Optional, OptionalInt, Properties}
import java.util.concurrent.CompletableFuture
@@ -546,6 +547,19 @@
future
}
+ override def completeTransaction(
+ tp: TopicPartition,
+ producerId: Long,
+ producerEpoch: Short,
+ coordinatorEpoch: Int,
+ result: TransactionResult,
+ timeout: Duration
+ ): CompletableFuture[Void] = {
+ FutureUtils.failedFuture(new IllegalStateException(
+ s"The old group coordinator does not support `completeTransaction` API."
+ ))
+ }
+
override def partitionFor(groupId: String): Int = {
coordinator.partitionFor(groupId)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb4dce8..872d526 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -77,6 +77,7 @@
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
+import java.time.Duration
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
@@ -2361,25 +2362,27 @@
* request, so there could be multiple appends of markers to the log. The final response will be sent only
* after all appends have returned.
*/
- def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
- trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus")
- val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava)
+ def maybeSendResponseCallback(producerId: Long, result: TransactionResult, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
+ trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
updateErrors(producerId, currentErrors)
- val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) =>
- topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE
- }.keys
- if (successfulOffsetsPartitions.nonEmpty) {
- // as soon as the end transaction marker has been written for a transactional offset commit,
- // call to the group coordinator to materialize the offsets into the cache
- try {
- groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result)
- } catch {
- case e: Exception =>
- error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
- val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
- successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
- updateErrors(producerId, updatedErrors)
+ if (!config.isNewGroupCoordinatorEnabled) {
+ val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
+ topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
+ }.keys
+
+ if (successfulOffsetsPartitions.nonEmpty) {
+ // as soon as the end transaction marker has been written for a transactional offset commit,
+ // call to the group coordinator to materialize the offsets into the cache
+ try {
+ groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result)
+ } catch {
+ case e: Exception =>
+ error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
+ val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+ successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
+ updateErrors(producerId, updatedErrors)
+ }
}
}
@@ -2416,14 +2419,56 @@
numAppends.decrementAndGet()
skippedMarkers += 1
} else {
- val controlRecords = partitionsWithCompatibleMessageFormat.map { partition =>
- val controlRecordType = marker.transactionResult match {
- case TransactionResult.COMMIT => ControlRecordType.COMMIT
- case TransactionResult.ABORT => ControlRecordType.ABORT
+ val controlRecordType = marker.transactionResult match {
+ case TransactionResult.COMMIT => ControlRecordType.COMMIT
+ case TransactionResult.ABORT => ControlRecordType.ABORT
+ }
+
+ val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
+ def maybeComplete(): Unit = {
+ if (partitionsWithCompatibleMessageFormat.size == markerResults.size) {
+ maybeSendResponseCallback(producerId, marker.transactionResult, markerResults)
}
- val endTxnMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch)
- partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker)
- }.toMap
+ }
+
+ val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+ partitionsWithCompatibleMessageFormat.foreach { partition =>
+ if (config.isNewGroupCoordinatorEnabled && partition.topic == GROUP_METADATA_TOPIC_NAME) {
+ // When the new group coordinator is used, writing the end marker is fully delegated
+ // to the group coordinator.
+ groupCoordinator.completeTransaction(
+ partition,
+ marker.producerId,
+ marker.producerEpoch,
+ marker.coordinatorEpoch,
+ marker.transactionResult,
+ Duration.ofMillis(config.requestTimeoutMs.toLong)
+ ).whenComplete { (_, exception) =>
+ val error = if (exception == null) {
+ Errors.NONE
+ } else {
+ Errors.forException(exception) match {
+ case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR =>
+ // The transaction coordinator does not expect those errors so we translate them
+ // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet.
+ Errors.NOT_LEADER_OR_FOLLOWER
+ case error =>
+ error
+ }
+ }
+ markerResults.put(partition, error)
+ maybeComplete()
+ }
+ } else {
+ // Otherwise, the regular appendRecords path is used for all the non __consumer_offsets
+ // partitions or for all partitions when the new group coordinator is disabled.
+ controlRecords += partition -> MemoryRecords.withEndTransactionMarker(
+ producerId,
+ marker.producerEpoch,
+ new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch)
+ )
+ }
+ }
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
@@ -2432,7 +2477,13 @@
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
- responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
+ responseCallback = errors => {
+ errors.forKeyValue { (tp, partitionResponse) =>
+ markerResults.put(tp, partitionResponse.error)
+ }
+ maybeComplete()
+ }
+ )
}
}
@@ -2446,6 +2497,7 @@
if (config.interBrokerProtocolVersion.isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
}
+
def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(IBP_0_11_0_IV0)
val addPartitionsToTxnRequest =
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 10e1e23..1c5064d 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -69,8 +69,13 @@
props.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
props.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
- props
+ // The new group coordinator does not support verifying transactions yet.
+ if (isNewGroupCoordinatorEnabled()) {
+ props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "false")
+ }
+
+ props
}
override protected def modifyConfigs(props: Seq[Properties]): Unit = {
@@ -111,7 +116,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testBasicTransactions(quorum: String): Unit = {
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
@@ -172,7 +177,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@@ -240,7 +245,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@@ -299,14 +304,14 @@
@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsWithGroupId(quorum: String): Unit = {
sendOffset((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsWithGroupMetadata(quorum: String): Unit = {
sendOffset((producer, _, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumer.groupMetadata()))
@@ -386,7 +391,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnCommit(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -416,7 +421,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnSendOffsets(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -448,7 +453,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = {
val tp = new TopicPartition(topic1, 0)
val groupId = "group"
@@ -474,26 +479,26 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testInitTransactionsTimeout(quorum: String): Unit = {
testTimeout(false, producer => producer.initTransactions())
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testSendOffsetsToTransactionTimeout(quorum: String): Unit = {
testTimeout(true, producer => producer.sendOffsetsToTransaction(
Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, new ConsumerGroupMetadata("test-group")))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testCommitTransactionTimeout(quorum: String): Unit = {
testTimeout(true, producer => producer.commitTransaction())
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testAbortTransactionTimeout(quorum: String): Unit = {
testTimeout(true, producer => producer.abortTransaction())
}
@@ -514,7 +519,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnSend(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -559,7 +564,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnAddPartitions(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -606,7 +611,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFencingOnTransactionExpiration(quorum: String): Unit = {
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
@@ -649,7 +654,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testMultipleMarkersOneLeader(quorum: String): Unit = {
val firstProducer = transactionalProducers.head
val consumer = transactionalConsumers.head
@@ -687,7 +692,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testConsecutivelyRunInitTransactions(quorum: String): Unit = {
val producer = createTransactionalProducer(transactionalId = "normalProducer")
@@ -696,7 +701,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testBumpTransactionalEpoch(quorum: String): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
@@ -758,7 +763,7 @@
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testFailureToFenceEpoch(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("transactional-producer", maxBlockMs = 1000)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 9fc1029..c583455 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -21,7 +21,8 @@
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
@@ -105,7 +106,7 @@
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
- when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(9L))
when(log.highWatermark).thenReturn(0L)
val readResult1 = logReadResult(startOffset = 0, records = Seq(
@@ -145,6 +146,34 @@
minOneMessage = true
)).thenReturn(readResult3)
+ val readResult4 = logReadResult(
+ startOffset = 7,
+ producerId = 100L,
+ producerEpoch = 5,
+ controlRecordType = ControlRecordType.COMMIT
+ )
+
+ when(log.read(
+ startOffset = 7L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult4)
+
+ val readResult5 = logReadResult(
+ startOffset = 8,
+ producerId = 500L,
+ producerEpoch = 10,
+ controlRecordType = ControlRecordType.ABORT
+ )
+
+ when(log.read(
+ startOffset = 8L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult5)
+
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
@@ -154,8 +183,12 @@
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(100L, 5.toShort, ("k6", "v6"))
verify(coordinator).replay(100L, 5.toShort, ("k7", "v7"))
+ verify(coordinator).replayEndTransactionMarker(100L, 5, TransactionResult.COMMIT)
+ verify(coordinator).replayEndTransactionMarker(500L, 10, TransactionResult.ABORT)
verify(coordinator).updateLastWrittenOffset(2)
verify(coordinator).updateLastWrittenOffset(5)
+ verify(coordinator).updateLastWrittenOffset(7)
+ verify(coordinator).updateLastWrittenOffset(8)
verify(coordinator).updateLastCommittedOffset(0)
}
}
@@ -635,4 +668,35 @@
new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
}
+
+ private def logReadResult(
+ startOffset: Long,
+ producerId: Long,
+ producerEpoch: Short,
+ controlRecordType: ControlRecordType
+ ): FetchDataInfo = {
+ val fileRecords = mock(classOf[FileRecords])
+ val memoryRecords = MemoryRecords.withEndTransactionMarker(
+ startOffset,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(controlRecordType, 0)
+ )
+
+ when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+ val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
+ when(fileRecords.readInto(
+ bufferCapture.capture(),
+ ArgumentMatchers.anyInt())
+ ).thenAnswer { _ =>
+ val buffer = bufferCapture.getValue
+ buffer.put(memoryRecords.buffer.duplicate)
+ buffer.flip()
+ }
+
+ new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 121a1f1..0cfbb8f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -22,13 +22,16 @@
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException}
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
+import org.apache.kafka.common.record.{CompressionType, ControlRecordType, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.group.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
@@ -234,6 +237,80 @@
assertEquals(records, receivedRecords)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT", "ABORT"))
+ def testWriteEndTransactionMarker(controlRecordType: ControlRecordType): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val time = new MockTime()
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager,
+ new StringKeyValueSerializer(),
+ CompressionType.NONE,
+ time
+ )
+
+ when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+ Collections.emptyMap(),
+ new Properties()
+ )))
+
+ val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+ when(replicaManager.appendRecords(
+ ArgumentMatchers.eq(0L),
+ ArgumentMatchers.eq(1.toShort),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+ recordsCapture.capture(),
+ callbackCapture.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenAnswer(_ => {
+ callbackCapture.getValue.apply(Map(
+ tp -> new PartitionResponse(
+ Errors.NONE,
+ 5,
+ 10,
+ RecordBatch.NO_TIMESTAMP,
+ -1,
+ Collections.emptyList(),
+ ""
+ )
+ ))
+ })
+
+ assertEquals(11, partitionRecordWriter.appendEndTransactionMarker(
+ tp,
+ 100L,
+ 50.toShort,
+ 10,
+ if (controlRecordType == ControlRecordType.COMMIT) TransactionResult.COMMIT else TransactionResult.ABORT
+ ))
+
+ val batch = recordsCapture.getValue.getOrElse(tp,
+ throw new AssertionError(s"No records for $tp"))
+ assertEquals(1, batch.batches.asScala.toList.size)
+
+ val firstBatch = batch.batches.asScala.head
+ assertEquals(100L, firstBatch.producerId)
+ assertEquals(50.toShort, firstBatch.producerEpoch)
+ assertTrue(firstBatch.isTransactional)
+ assertTrue(firstBatch.isControlBatch)
+
+ val receivedRecords = batch.records.asScala.map { record =>
+ ControlRecordType.parse(record.key)
+ }.toList
+
+ assertEquals(List(controlRecordType), receivedRecords)
+ }
+
@Test
def testWriteRecordsWithFailure(): Unit = {
val tp = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 702fd1d..26e3670 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -22,7 +22,7 @@
import java.util
import java.util.Arrays.asList
import java.util.concurrent.{CompletableFuture, TimeUnit}
-import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties}
+import java.util.{Collections, Comparator, Optional, OptionalInt, OptionalLong, Properties}
import kafka.api.LeaderAndIsr
import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
@@ -84,7 +84,7 @@
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
@@ -100,9 +100,11 @@
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
+import java.time.Duration
+
class KafkaApisTest {
private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
@@ -3019,6 +3021,223 @@
}
@Test
+ def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
+ val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+ val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
+ val foo0 = new TopicPartition("foo", 0)
+ val foo1 = new TopicPartition("foo", 1)
+
+ val allPartitions = List(
+ offset0,
+ offset1,
+ foo0,
+ foo1
+ )
+
+ val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
+ ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+ List(
+ new TxnMarkerEntry(
+ 1L,
+ 1.toShort,
+ 0,
+ TransactionResult.COMMIT,
+ List(offset0, foo0).asJava
+ ),
+ new TxnMarkerEntry(
+ 2L,
+ 1.toShort,
+ 0,
+ TransactionResult.ABORT,
+ List(offset1, foo1).asJava
+ )
+ ).asJava
+ ).build()
+
+ val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
+
+ allPartitions.foreach { tp =>
+ when(replicaManager.getMagic(tp))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ }
+
+ when(groupCoordinator.completeTransaction(
+ ArgumentMatchers.eq(offset0),
+ ArgumentMatchers.eq(1L),
+ ArgumentMatchers.eq(1.toShort),
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(TransactionResult.COMMIT),
+ ArgumentMatchers.eq(Duration.ofMillis(Defaults.RequestTimeoutMs))
+ )).thenReturn(CompletableFuture.completedFuture[Void](null))
+
+ when(groupCoordinator.completeTransaction(
+ ArgumentMatchers.eq(offset1),
+ ArgumentMatchers.eq(2L),
+ ArgumentMatchers.eq(1.toShort),
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(TransactionResult.ABORT),
+ ArgumentMatchers.eq(Duration.ofMillis(Defaults.RequestTimeoutMs))
+ )).thenReturn(CompletableFuture.completedFuture[Void](null))
+
+ val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+ val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+ when(replicaManager.appendRecords(
+ ArgumentMatchers.eq(Defaults.RequestTimeoutMs.toLong),
+ ArgumentMatchers.eq(-1),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+ entriesPerPartition.capture(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(RequestLocal.NoCaching),
+ any(),
+ any()
+ )).thenAnswer { _ =>
+ responseCallback.getValue.apply(
+ entriesPerPartition.getValue.keySet.map { tp =>
+ tp -> new PartitionResponse(Errors.NONE)
+ }.toMap
+ )
+ }
+
+ createKafkaApis(overrideProperties = Map(
+ KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+ )).handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.NoCaching)
+
+ val expectedResponse = new WriteTxnMarkersResponseData()
+ .setMarkers(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+ .setProducerId(1L)
+ .setTopics(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitions(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName("foo")
+ .setPartitions(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+ .setProducerId(2L)
+ .setTopics(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitions(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName("foo")
+ .setPartitions(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava)
+ ).asJava)
+ ).asJava)
+
+ val response = verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
+ assertEquals(normalize(expectedResponse), normalize(response.data))
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[Errors], names = Array(
+ "COORDINATOR_NOT_AVAILABLE",
+ "COORDINATOR_LOAD_IN_PROGRESS",
+ "NOT_COORDINATOR",
+ "REQUEST_TIMED_OUT"
+ ))
+ def testHandleWriteTxnMarkersRequestWithNewGroupCoordinatorErrorTranslation(error: Errors): Unit = {
+ val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+
+ val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
+ ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+ List(
+ new TxnMarkerEntry(
+ 1L,
+ 1.toShort,
+ 0,
+ TransactionResult.COMMIT,
+ List(offset0).asJava
+ )
+ ).asJava
+ ).build()
+
+ val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
+
+ when(replicaManager.getMagic(offset0))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+
+ when(groupCoordinator.completeTransaction(
+ ArgumentMatchers.eq(offset0),
+ ArgumentMatchers.eq(1L),
+ ArgumentMatchers.eq(1.toShort),
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(TransactionResult.COMMIT),
+ ArgumentMatchers.eq(Duration.ofMillis(Defaults.RequestTimeoutMs))
+ )).thenReturn(FutureUtils.failedFuture[Void](error.exception()))
+
+ createKafkaApis(overrideProperties = Map(
+ KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+ )).handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.NoCaching)
+
+ val expectedError = error match {
+ case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR =>
+ Errors.NOT_LEADER_OR_FOLLOWER
+ case error =>
+ error
+ }
+
+ val expectedResponse = new WriteTxnMarkersResponseData()
+ .setMarkers(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+ .setProducerId(1L)
+ .setTopics(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitions(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(0)
+ .setErrorCode(expectedError.code)
+ ).asJava)
+ ).asJava)
+ ).asJava)
+
+ val response = verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
+ assertEquals(normalize(expectedResponse), normalize(response.data))
+ }
+
+ private def normalize(
+ response: WriteTxnMarkersResponseData
+ ): WriteTxnMarkersResponseData = {
+ val copy = response.duplicate()
+ copy.markers.sort(
+ Comparator.comparingLong[WriteTxnMarkersResponseData.WritableTxnMarkerResult](_.producerId)
+ )
+ copy.markers.forEach { marker =>
+ marker.topics.sort((t1, t2) => t1.name.compareTo(t2.name))
+ marker.topics.forEach { topic =>
+ topic.partitions.sort(
+ Comparator.comparingInt[WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult](_.partitionIndex)
+ )
+ }
+ }
+ copy
+ }
+
+ @Test
def testLeaderReplicaIfLocalRaisesFencedLeaderEpoch(): Unit = {
testListOffsetFailedGetLeaderReplica(Errors.FENCED_LEADER_EPOCH)
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 948a585..33fef01 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -46,6 +46,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import java.time.Duration;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
@@ -267,6 +268,29 @@
);
/**
+ * Complete a transaction. This is called when the WriteTxnMarkers API is called
+ * by the Transaction Coordinator in order to write the markers to the
+ * __consumer_offsets partitions.
+ *
+ * @param tp The topic-partition.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param coordinatorEpoch The epoch of the transaction coordinator.
+ * @param result The transaction result.
+ * @param timeout The operation timeout.
+ *
+ * @return A future yielding the result.
+ */
+ CompletableFuture<Void> completeTransaction(
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ Duration timeout
+ );
+
+ /**
* Return the partition index for the given Group.
*
* @param groupId The group id.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index feb4a92..3e007b7 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -78,6 +78,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
@@ -902,9 +903,12 @@
));
}
- return runtime.scheduleWriteOperation(
+ return runtime.scheduleTransactionalWriteOperation(
"txn-commit-offset",
topicPartitionFor(request.groupId()),
+ request.transactionalId(),
+ request.producerId(),
+ request.producerEpoch(),
Duration.ofMillis(config.offsetCommitTimeoutMs),
coordinator -> coordinator.commitTransactionalOffset(context, request)
).exceptionally(exception ->
@@ -945,6 +949,39 @@
}
/**
+ * See {@link GroupCoordinator#completeTransaction(TopicPartition, long, short, int, TransactionResult, Duration)}.
+ */
+ @Override
+ public CompletableFuture<Void> completeTransaction(
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ Duration timeout
+ ) {
+ if (!isActive.get()) {
+ return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ if (!tp.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+ return FutureUtils.failedFuture(new IllegalStateException(
+ "Completing a transaction for " + tp + " is not expected"
+ ));
+ }
+
+ return runtime.scheduleTransactionCompletion(
+ "write-txn-marker",
+ tp,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result,
+ timeout
+ );
+ }
+
+ /**
* See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, TransactionResult)}.
*/
@Override
@@ -954,6 +991,7 @@
TransactionResult transactionResult
) {
throwIfNotActive();
+ throw new IllegalStateException("onTransactionCompleted is not supported.");
}
/**
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index b68549e..66694af 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -43,6 +43,7 @@
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
@@ -716,4 +717,21 @@
+ " in " + record);
}
}
+
+ /**
+ * Applies the given transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param result The result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+ @Override
+ public void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException {
+ offsetMetadataManager.replayEndTransactionMarker(producerId, result);
+ }
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 0de78b3..b4f90bd 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -36,6 +36,7 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
@@ -861,6 +862,7 @@
}
if (producerId == RecordBatch.NO_PRODUCER_ID) {
+ log.debug("Replaying offset commit with key {}, value {}", key, value);
// If the offset is not part of a transaction, it is directly stored
// in the offsets store.
OffsetAndMetadata previousValue = offsets.put(
@@ -873,6 +875,7 @@
metrics.incrementNumOffsets();
}
} else {
+ log.debug("Replaying transactional offset commit with producer id {}, key {}, value {}", producerId, key, value);
// Otherwise, the transaction offset is stored in the pending transactional
// offsets store. Pending offsets there are moved to the main store when
// the transaction is committed; or removed when the transaction is aborted.
@@ -892,6 +895,43 @@
}
/**
+ * Applies the given transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param result The result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+ public void replayEndTransactionMarker(
+ long producerId,
+ TransactionResult result
+ ) throws RuntimeException {
+ Offsets pendingOffsets = pendingTransactionalOffsets.remove(producerId);
+
+ if (result == TransactionResult.COMMIT) {
+ log.debug("Committed transactional offset commits for producer id {}.", producerId);
+ if (pendingOffsets == null) return;
+
+ pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+ topicOffsets.forEach((topicName, partitionOffsets) -> {
+ partitionOffsets.forEach((partitionId, offsetAndMetadata) -> {
+ log.debug("Committed transaction offset commit for producer id {} in group {} " +
+ "with topic {}, partition {}, and offset {}.",
+ producerId, groupId, topicName, partitionId, offsetAndMetadata);
+ offsets.put(
+ groupId,
+ topicName,
+ partitionId,
+ offsetAndMetadata
+ );
+ });
+ });
+ });
+ } else {
+ log.debug("Aborted transactional offset commits for producer id {}.", producerId);
+ }
+ }
+
+ /**
* A new metadata image is available.
*
* @param newImage The new metadata image.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
index 1fd8c97..9aa5400 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.group.runtime;
+import org.apache.kafka.common.requests.TransactionResult;
+
/**
* The CoordinatorPlayback interface. This interface is used to replay
* records to the coordinator in order to update its state. This is
@@ -40,6 +42,20 @@
) throws RuntimeException;
/**
+ * Applies the given transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param result The result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+ void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException;
+
+ /**
* Invoke operations when a batch has been successfully loaded.
*
* @param offset the offset of the last record in the batch plus one.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 75e9f1d..fffdd83 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -914,6 +915,164 @@
}
/**
+ * A coordinator event that applies and writes a transaction end marker.
+ */
+ class CoordinatorCompleteTransactionEvent implements CoordinatorEvent, DeferredEvent {
+ /**
+ * The topic partition that this write event is applied to.
+ */
+ final TopicPartition tp;
+
+ /**
+ * The operation name.
+ */
+ final String name;
+
+ /**
+ * The producer id.
+ */
+ final long producerId;
+
+ /**
+ * The producer epoch.
+ */
+ final short producerEpoch;
+
+ /**
+ * The coordinator epoch of the transaction coordinator.
+ */
+ final int coordinatorEpoch;
+
+ /**
+ * The transaction result.
+ */
+ final TransactionResult result;
+
+ /**
+ * Timeout value for the write operation.
+ */
+ final Duration writeTimeout;
+
+ /**
+ * The future that will be completed with the response
+ * generated by the write operation or an error.
+ */
+ final CompletableFuture<Void> future;
+
+ /**
+ * The time this event was created.
+ */
+ private final long createdTimeMs;
+
+ CoordinatorCompleteTransactionEvent(
+ String name,
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ Duration writeTimeout
+ ) {
+ this.name = name;
+ this.tp = tp;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.coordinatorEpoch = coordinatorEpoch;
+ this.result = result;
+ this.writeTimeout = writeTimeout;
+ this.future = new CompletableFuture<>();
+ this.createdTimeMs = time.milliseconds();
+ }
+
+ /**
+ * @return The key used by the CoordinatorEventProcessor to ensure
+ * that events with the same key are not processed concurrently.
+ */
+ @Override
+ public TopicPartition key() {
+ return tp;
+ }
+
+ /**
+ * Called by the CoordinatorEventProcessor when the event is executed.
+ */
+ @Override
+ public void run() {
+ try {
+ withActiveContextOrThrow(tp, context -> {
+ long prevLastWrittenOffset = context.coordinator.lastWrittenOffset();
+
+ try {
+ context.coordinator.replayEndTransactionMarker(
+ producerId,
+ producerEpoch,
+ result
+ );
+
+ long offset = partitionWriter.appendEndTransactionMarker(
+ tp,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result
+ );
+ context.coordinator.updateLastWrittenOffset(offset);
+
+ if (!future.isDone()) {
+ context.deferredEventQueue.add(offset, this);
+ timer.add(new TimerTask(writeTimeout.toMillis()) {
+ @Override
+ public void run() {
+ if (!future.isDone()) {
+ scheduleInternalOperation(
+ "WriteTimeout(name=" + name + ", tp=" + tp + ")",
+ tp,
+ () -> complete(new TimeoutException("CoordinatorCompleteTransactionEvent " + name +
+ " timed out after " + writeTimeout.toMillis() + "ms"))
+ );
+ }
+ }
+ });
+ } else {
+ complete(null);
+ }
+ } catch (Throwable t) {
+ context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
+ complete(t);
+ }
+ });
+ } catch (Throwable t) {
+ complete(t);
+ }
+ }
+
+ /**
+ * Completes the future with either the result of the write operation
+ * or the provided exception.
+ *
+ * @param exception The exception to complete the future with.
+ */
+ @Override
+ public void complete(Throwable exception) {
+ if (exception == null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(exception);
+ }
+ }
+
+ @Override
+ public long createdTimeMs() {
+ return createdTimeMs;
+ }
+
+ @Override
+ public String toString() {
+ return "CoordinatorCompleteTransactionEvent(name=" + name + ")";
+ }
+ }
+
+ /**
* A coordinator internal event.
*/
class CoordinatorInternalEvent implements CoordinatorEvent {
@@ -1321,6 +1480,44 @@
}
/**
+ * Schedules the transaction completion.
+ *
+ * @param name The name of the operation.
+ * @param tp The address of the coordinator (aka its topic-partitions).
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param coordinatorEpoch The epoch of the transaction coordinator.
+ * @param result The transaction result.
+ *
+ * @return A future that will be completed with null when the operation is
+ * completed or an exception if the operation failed.
+ */
+ public CompletableFuture<Void> scheduleTransactionCompletion(
+ String name,
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ Duration timeout
+ ) {
+ throwIfNotRunning();
+ log.debug("Scheduled execution of transaction completion for {} with producer id={}, producer epoch={}, " +
+ "coordinator epoch={} and transaction result={}.", tp, producerId, producerEpoch, coordinatorEpoch, result);
+ CoordinatorCompleteTransactionEvent event = new CoordinatorCompleteTransactionEvent(
+ name,
+ tp,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result,
+ timeout
+ );
+ enqueue(event);
+ return event.future;
+ }
+
+ /**
* Schedules a read operation.
*
* @param name The name of the write operation.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
index cd5f4e2..391acff 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.runtime;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -51,11 +52,27 @@
/**
* Replay a record to update the state machine.
*
- * @param record The record to replay.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param record The record to replay.
*/
void replay(
long producerId,
short producerEpoch,
U record
) throws RuntimeException;
+
+ /**
+ * Applies the end transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param result The result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+ default void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException {}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
index e3efbfa..e4270dd 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
@@ -18,6 +18,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.TransactionResult;
import java.util.List;
@@ -96,4 +97,23 @@
short producerEpoch,
List<T> records
) throws KafkaException;
+
+ /**
+ * Write the transaction end marker.
+ *
+ * @param tp The partition to write records to.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param coordinatorEpoch The epoch of the transaction coordinator.
+ * @param result The transaction result.
+ * @return The log end offset right after the written records.
+ * @throws KafkaException Any KafkaException caught during the write operation.
+ */
+ long appendEndTransactionMarker(
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result
+ ) throws KafkaException;
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
index 6351346..c5c6995 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -112,6 +113,22 @@
}
/**
+ * Applies the end transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param result The result of the transaction.
+ */
+ @Override
+ public synchronized void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) {
+ coordinator.replayEndTransactionMarker(producerId, producerEpoch, result);
+ }
+
+ /**
* Updates the last written offset. This also create a new snapshot
* in the snapshot registry.
*
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 719d166..b9b162d 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -59,6 +59,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -93,8 +94,10 @@
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -1870,6 +1873,8 @@
TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setTransactionalId("transactional-id")
+ .setProducerId(10L)
+ .setProducerEpoch((short) 5)
.setMemberId("member-id")
.setGenerationId(10)
.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
@@ -1885,9 +1890,12 @@
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))));
- when(runtime.scheduleWriteOperation(
+ when(runtime.scheduleTransactionalWriteOperation(
ArgumentMatchers.eq("txn-commit-offset"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.eq("transactional-id"),
+ ArgumentMatchers.eq(10L),
+ ArgumentMatchers.eq((short) 5),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
@@ -1900,4 +1908,82 @@
assertEquals(response, future.get());
}
+
+ @Test
+ public void testCompleteTransaction() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ when(runtime.scheduleTransactionCompletion(
+ ArgumentMatchers.eq("write-txn-marker"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.eq(100L),
+ ArgumentMatchers.eq((short) 5),
+ ArgumentMatchers.eq(10),
+ ArgumentMatchers.eq(TransactionResult.COMMIT),
+ ArgumentMatchers.eq(Duration.ofMillis(100))
+ )).thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<Void> future = service.completeTransaction(
+ new TopicPartition("__consumer_offsets", 0),
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ Duration.ofMillis(100)
+ );
+
+ assertNull(future.get());
+ }
+
+ @Test
+ public void testCompleteTransactionWhenNotCoordinatorServiceStarted() {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+
+ CompletableFuture<Void> future = service.completeTransaction(
+ new TopicPartition("foo", 0),
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ Duration.ofMillis(100)
+ );
+
+ assertFutureThrows(future, CoordinatorNotAvailableException.class);
+ }
+
+ @Test
+ public void testCompleteTransactionWithUnexpectedPartition() {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ CompletableFuture<Void> future = service.completeTransaction(
+ new TopicPartition("foo", 0),
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ Duration.ofMillis(100)
+ );
+
+ assertFutureThrows(future, IllegalStateException.class);
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9d0bccc..740ad24 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -52,6 +53,8 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
@@ -968,4 +971,34 @@
verify(groupMetadataManager, times(1)).maybeDeleteGroup(eq("group-id"), any());
verify(groupMetadataManager, times(0)).maybeDeleteGroup(eq("other-group-id"), any());
}
+
+ @ParameterizedTest
+ @EnumSource(value = TransactionResult.class)
+ public void testReplayEndTransactionMarker(TransactionResult result) {
+ GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ coordinator.replayEndTransactionMarker(
+ 100L,
+ (short) 5,
+ result
+ );
+
+ verify(offsetMetadataManager, times(1)).replayEndTransactionMarker(
+ 100L,
+ result
+ );
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index bf1ef89..b86ddc7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -41,6 +41,7 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
@@ -442,6 +443,15 @@
lastWrittenOffset++;
}
+ private void replayEndTransactionMarker(
+ long producerId,
+ TransactionResult result
+ ) {
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ offsetMetadataManager.replayEndTransactionMarker(producerId, result);
+ lastWrittenOffset++;
+ }
+
public void testOffsetDeleteWith(
String groupId,
String topic,
@@ -2415,6 +2425,73 @@
}
@Test
+ public void testReplayTransactionEndMarkerWithCommit() {
+ OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
+
+ // Add pending transactional commit for producer id 5.
+ verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ // Add pending transactional commit for producer id 6.
+ verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata(
+ 200L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ // Replaying an end marker with an unknown producer id should not fail.
+ context.replayEndTransactionMarker(1L, TransactionResult.COMMIT);
+
+ // Replaying an end marker to commit transaction of producer id 5.
+ context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
+
+ // The pending offset is removed...
+ assertNull(context.offsetMetadataManager.pendingTransactionalOffset(
+ 5L,
+ "foo",
+ "bar",
+ 0
+ ));
+
+ // ... and added to the main offset storage.
+ assertEquals(new OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ), context.offsetMetadataManager.offset(
+ "foo",
+ "bar",
+ 0
+ ));
+
+ // Replaying an end marker to abort transaction of producer id 6.
+ context.replayEndTransactionMarker(6L, TransactionResult.ABORT);
+
+ // The pending offset is removed from the pending offsets and
+ // it is not added to the main offset storage.
+ assertNull(context.offsetMetadataManager.pendingTransactionalOffset(
+ 6L,
+ "foo",
+ "bar",
+ 1
+ ));
+ assertNull(context.offsetMetadataManager.offset(
+ "foo",
+ "bar",
+ 1
+ ));
+ }
+
+ @Test
public void testOffsetCommitsSensor() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index d768f4a..e0d4dd0 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,8 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
@@ -29,8 +31,11 @@
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import java.time.Duration;
@@ -58,6 +63,7 @@
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -167,14 +173,24 @@
*/
private static class MockPartitionWriter extends InMemoryPartitionWriter<String> {
private final int maxRecordsInBatch;
+ private final boolean failEndMarker;
public MockPartitionWriter() {
- this(Integer.MAX_VALUE);
+ this(Integer.MAX_VALUE, false);
}
public MockPartitionWriter(int maxRecordsInBatch) {
+ this(maxRecordsInBatch, false);
+ }
+
+ public MockPartitionWriter(boolean failEndMarker) {
+ this(Integer.MAX_VALUE, failEndMarker);
+ }
+
+ public MockPartitionWriter(int maxRecordsInBatch, boolean failEndMarker) {
super(false);
this.maxRecordsInBatch = maxRecordsInBatch;
+ this.failEndMarker = failEndMarker;
}
@Override
@@ -206,20 +222,44 @@
records.size(), maxRecordsInBatch));
}
}
+
+ @Override
+ public long appendEndTransactionMarker(
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result
+ ) throws KafkaException {
+ if (failEndMarker) {
+ throw new KafkaException("Can't write end marker.");
+ }
+ return super.appendEndTransactionMarker(
+ tp,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result
+ );
+ }
}
/**
* A simple Coordinator implementation that stores the records into a set.
*/
static class MockCoordinatorShard implements CoordinatorShard<String> {
+ private final SnapshotRegistry snapshotRegistry;
private final TimelineHashSet<String> records;
+ private final TimelineHashMap<Long, TimelineHashSet<String>> pendingRecords;
private final CoordinatorTimer<Void, String> timer;
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer
) {
+ this.snapshotRegistry = snapshotRegistry;
this.records = new TimelineHashSet<>(snapshotRegistry, 0);
+ this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
this.timer = timer;
}
@@ -229,7 +269,34 @@
short producerEpoch,
String record
) throws RuntimeException {
- records.add(record);
+ if (producerId == RecordBatch.NO_PRODUCER_ID) {
+ records.add(record);
+ } else {
+ pendingRecords
+ .computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0))
+ .add(record);
+ }
+ }
+
+ @Override
+ public void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException {
+ if (result == TransactionResult.COMMIT) {
+ TimelineHashSet<String> pending = pendingRecords.remove(producerId);
+ if (pending == null) return;
+ records.addAll(pending);
+ } else {
+ pendingRecords.remove(producerId);
+ }
+ }
+
+ Set<String> pendingRecords(long producerId) {
+ TimelineHashSet<String> pending = pendingRecords.get(producerId);
+ if (pending == null) return Collections.emptySet();
+ return Collections.unmodifiableSet(new HashSet<>(pending));
}
Set<String> records() {
@@ -739,7 +806,10 @@
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
- assertEquals(Arrays.asList("record1", "record2"), writer.records(TP));
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value("record1"),
+ InMemoryPartitionWriter.LogEntry.value("record2")
+ ), writer.entries(TP));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
@@ -757,7 +827,11 @@
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
- assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP));
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value("record1"),
+ InMemoryPartitionWriter.LogEntry.value("record2"),
+ InMemoryPartitionWriter.LogEntry.value("record3")
+ ), writer.entries(TP));
// Write #3 but without any records.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
@@ -771,7 +845,11 @@
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
- assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP));
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value("record1"),
+ InMemoryPartitionWriter.LogEntry.value("record2"),
+ InMemoryPartitionWriter.LogEntry.value("record3")
+ ), writer.entries(TP));
// Commit write #1.
writer.commit(TP, 2);
@@ -1073,6 +1151,326 @@
);
}
+ @ParameterizedTest
+ @EnumSource(value = TransactionResult.class)
+ public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Transactional write #1.
+ CompletableFuture<String> write1 = runtime.scheduleTransactionalWriteOperation(
+ "write#1",
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 5,
+ DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write1.isDone());
+
+ // The last written offset is updated.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ // The last committed offset does not change.
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ // A new snapshot is created.
+ assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
+ // Records have been replayed to the coordinator. They are stored in
+ // the pending set for now.
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(
+ 100L
+ ));
+ // Records have been written to the log.
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+ ), writer.entries(TP));
+
+ // Complete transaction #1.
+ CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 5,
+ 10,
+ result,
+ DEFAULT_WRITE_TIMEOUT
+ );
+
+ // Verify that the completion is not committed yet.
+ assertFalse(complete1.isDone());
+
+ // The last written offset is updated.
+ assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+ // The last committed offset does not change.
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ // A new snapshot is created.
+ assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
+ // Records have been replayed to the coordinator.
+ if (result == TransactionResult.COMMIT) {
+ // They are now in the records set if committed.
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
+ } else {
+ // Or they are gone if aborted.
+ assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
+ }
+
+ // Records have been written to the log.
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+ InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result)
+ ), writer.entries(TP));
+
+ // Commit write #1.
+ writer.commit(TP, 2);
+
+ // The write is completed.
+ assertTrue(write1.isDone());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+ // Commit completion #1.
+ writer.commit(TP, 3);
+
+ // The transaction is completed.
+ assertTrue(complete1.isDone());
+ assertNull(complete1.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testScheduleTransactionCompletionWhenWriteTimesOut() throws InterruptedException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Loads the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+ assertEquals(0, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Complete #1. We should get a TimeoutException because the HWM will not advance.
+ CompletableFuture<Void> timedOutCompletion = runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ Duration.ofMillis(3)
+ );
+
+ // Verify that the state has been updated.
+ assertEquals(1L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Advance clock to timeout Complete #1.
+ timer.advanceClock(4);
+
+ assertFutureThrows(timedOutCompletion, org.apache.kafka.common.errors.TimeoutException.class);
+
+ // Verify that the state is still the same. We don't revert when the
+ // operation timeouts because the record has been written to the log.
+ assertEquals(1L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
+ }
+
+ @Test
+ public void testScheduleTransactionCompletionWhenWriteFails() {
+ MockTimer timer = new MockTimer();
+ // The partition writer accepts records but fails on markers.
+ MockPartitionWriter writer = new MockPartitionWriter(true);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Loads the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+ assertEquals(0, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Write #1. It should succeed and be applied to the coordinator.
+ runtime.scheduleTransactionalWriteOperation(
+ "write#1",
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 5,
+ DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
+
+ // Verify that the state has been updated.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
+
+ // Complete transaction #1. It should fail.
+ CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ DEFAULT_WRITE_TIMEOUT
+ );
+ assertFutureThrows(complete1, KafkaException.class);
+
+ // Verify that the state has not changed.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
+ }
+
+ @Test
+ public void testScheduleTransactionCompletionWhenReplayFails() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Loads the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Override the coordinator with a coordinator that throws
+ // an exception when replayEndTransactionMarker is called.
+ SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+ ctx.coordinator = new SnapshottableCoordinator<>(
+ new LogContext(),
+ snapshotRegistry,
+ new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+ @Override
+ public void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException {
+ throw new IllegalArgumentException("error");
+ }
+ },
+ TP
+ );
+
+ // Write #1. It should succeed and be applied to the coordinator.
+ runtime.scheduleTransactionalWriteOperation(
+ "write#1",
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 5,
+ DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
+
+ // Verify that the state has been updated.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+ ), writer.entries(TP));
+
+ // Complete transaction #1. It should fail.
+ CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ DEFAULT_WRITE_TIMEOUT
+ );
+ assertFutureThrows(complete1, IllegalArgumentException.class);
+
+ // Verify that the state has not changed.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
+ assertEquals(Arrays.asList(
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+ InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+ ), writer.entries(TP));
+ }
+
@Test
public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
index 759bbb2..3cf8d48 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
@@ -18,13 +18,17 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
/**
* An in-memory partition writer.
@@ -33,10 +37,141 @@
*/
public class InMemoryPartitionWriter<T> implements PartitionWriter<T> {
+ public static class LogEntry {
+ public static <T> LogEntry value(T value) {
+ return new LogValue<>(value);
+ }
+
+ public static <T> LogEntry value(
+ long producerId,
+ short producerEpoch,
+ T value
+ ) {
+ return new LogValue<>(
+ producerId,
+ producerEpoch,
+ value
+ );
+ }
+
+ public static LogEntry control(
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result
+ ) {
+ return new LogControl(
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result
+ );
+ }
+ }
+
+ public static class LogValue<T> extends LogEntry {
+ public final long producerId;
+ public final short producerEpoch;
+ public final T value;
+
+ private LogValue(
+ long producerId,
+ short producerEpoch,
+ T value
+ ) {
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.value = value;
+ }
+
+ private LogValue(T value) {
+ this(
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ value
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LogValue<?> that = (LogValue<?>) o;
+
+ return Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? value.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "BasicRecord(" +
+ "producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch +
+ ", value=" + value +
+ ')';
+ }
+ }
+
+ public static class LogControl extends LogEntry {
+ public final long producerId;
+ public final short producerEpoch;
+ public final int coordinatorEpoch;
+ public final TransactionResult result;
+
+ private LogControl(
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result
+ ) {
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.coordinatorEpoch = coordinatorEpoch;
+ this.result = result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LogControl that = (LogControl) o;
+
+ if (producerId != that.producerId) return false;
+ if (producerEpoch != that.producerEpoch) return false;
+ if (coordinatorEpoch != that.coordinatorEpoch) return false;
+ return result == that.result;
+ }
+
+ @Override
+ public int hashCode() {
+ int result1 = (int) (producerId ^ (producerId >>> 32));
+ result1 = 31 * result1 + (int) producerEpoch;
+ result1 = 31 * result1 + coordinatorEpoch;
+ result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
+ return result1;
+ }
+
+ @Override
+ public String toString() {
+ return "ControlRecord(" +
+ "producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch +
+ ", coordinatorEpoch=" + coordinatorEpoch +
+ ", result=" + result +
+ ')';
+ }
+ }
+
private class PartitionState {
private ReentrantLock lock = new ReentrantLock();
private List<Listener> listeners = new ArrayList<>();
- private List<T> records = new ArrayList<>();
+ private List<LogEntry> entries = new ArrayList<>();
private long endOffset = 0L;
private long committedOffset = 0L;
}
@@ -93,7 +228,11 @@
PartitionState state = partitionState(tp);
state.lock.lock();
try {
- state.records.addAll(records);
+ state.entries.addAll(records.stream().map(record -> new LogValue<T>(
+ producerId,
+ producerEpoch,
+ record
+ )).collect(Collectors.toList()));
state.endOffset += records.size();
if (autoCommit) commit(tp, state.endOffset);
return state.endOffset;
@@ -102,6 +241,31 @@
}
}
+ @Override
+ public long appendEndTransactionMarker(
+ TopicPartition tp,
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result
+ ) throws KafkaException {
+ PartitionState state = partitionState(tp);
+ state.lock.lock();
+ try {
+ state.entries.add(new LogControl(
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result
+ ));
+ state.endOffset += 1;
+ if (autoCommit) commit(tp, state.endOffset);
+ return state.endOffset;
+ } finally {
+ state.lock.unlock();
+ }
+ }
+
public void commit(
TopicPartition tp,
long offset
@@ -133,13 +297,13 @@
}
}
- public List<T> records(
+ public List<LogEntry> entries(
TopicPartition tp
) {
PartitionState state = partitionState(tp);
state.lock.lock();
try {
- return Collections.unmodifiableList(state.records);
+ return Collections.unmodifiableList(state.entries);
} finally {
state.lock.unlock();
}