KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type overflow (#4928)
Switch from sum operations to subtraction to avoid type casting in checks and type overflow during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 75eb1b3..045ef4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -59,7 +59,7 @@
@Override
public FileChannelRecordBatch nextBatch() throws IOException {
- if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
+ if (position >= end - HEADER_SIZE_UP_TO_MAGIC)
return null;
logHeaderBuffer.rewind();
@@ -73,7 +73,7 @@
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
- if (position + LOG_OVERHEAD + size > end)
+ if (position > end - LOG_OVERHEAD - size)
return null;
byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index d5de4bd..4f04de1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -114,8 +114,8 @@
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
};
+
SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
new SimpleRecord(897839L, null, "4".getBytes()),
@@ -155,8 +155,8 @@
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
};
+
SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
new SimpleRecord(897839L, null, "4".getBytes()),
@@ -209,6 +209,22 @@
}
}
+ @Test
+ public void testNextBatchSelectionWithMaxedParams() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, Integer.MAX_VALUE);
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ @Test
+ public void testNextBatchSelectionWithZeroedParams() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0, 0);
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence,
boolean isTransactional, SimpleRecord ... records) {
assertEquals(producerId, batch.producerId());