KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)

When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not when doing the cleaning. This patch fixes the problem by checking for control batches in the `shouldRetainRecord` callback.

Reviewers: Jun Rao <junrao@gmail.com>
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 55d7952..74f9627 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -692,6 +692,8 @@
         if (discardBatchRecords)
           // The batch is only retained to preserve producer sequence information; the records can be removed
           false
+        else if (batch.isControlBatch)
+          true
         else
           Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b3a1a76..b1ba7cb 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1031,6 +1031,50 @@
     assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
+
+  @Test
+  def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val leaderEpoch = 5
+    val producerEpoch = 0.toShort
+
+    // First we append one committed transaction
+    val producerId1 = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch)
+    appendProducer(Seq(1))
+    log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+
+    // Now we append one transaction with a key which conflicts with the COMMIT marker appended above
+    def commitRecordKey(): ByteBuffer = {
+      val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
+      val key = ByteBuffer.allocate(keySize)
+      ControlRecordType.COMMIT.recordKey().writeTo(key)
+      key.flip()
+      key
+    }
+
+    val producerId2 = 2L
+    val records = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId2,
+      producerEpoch,
+      0,
+      new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes))
+    )
+    log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.Client)
+    log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+    log.roll()
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+
+    // After cleaning, the marker should not be removed
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+  }
+
   @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 1 message in the map
@@ -1934,19 +1978,31 @@
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: UnifiedLog,
-                                          producerId: Long,
-                                          producerEpoch: Short,
-                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
+  private def appendTransactionalAsLeader(
+    log: UnifiedLog,
+    producerId: Long,
+    producerEpoch: Short,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(
+      log,
+      producerId,
+      producerEpoch,
+      isTransactional = true,
+      leaderEpoch = leaderEpoch,
+      origin = origin
+    )
   }
 
-  private def appendIdempotentAsLeader(log: UnifiedLog,
-                                       producerId: Long,
-                                       producerEpoch: Short,
-                                       isTransactional: Boolean = false,
-                                       leaderEpoch: Int = 0,
-                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+  private def appendIdempotentAsLeader(
+    log: UnifiedLog,
+    producerId: Long,
+    producerEpoch: Short,
+    isTransactional: Boolean = false,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>