KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)
Kafka should not NPE while loading a deleted partition dir with no log segments. This patch ensures that there will always be at least one segment after initialization.
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9ab6fda..7a9b76a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -572,6 +572,15 @@
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
+ if (logSegments.isEmpty) {
+ addSegment(LogSegment.open(dir = dir,
+ baseOffset = 0,
+ config,
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = this.initFileSize,
+ preallocate = false))
+ }
0
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index dc9a423..e529fe9 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3736,7 +3736,17 @@
assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head)
}
- private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+ @Test
+ def testLoadPartitionDirWithNoSegmentsShouldNotThrow() {
+ val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
+ val logDir = new File(tmpDir, dirName)
+ logDir.mkdirs()
+ val logConfig = LogTest.createLogConfig()
+ val log = createLog(logDir, logConfig)
+ assertEquals(1, log.numberOfSegments)
+ }
+
+ private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {
var sequence = 0