eliminate extra object instantiation (#11345)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e85e6fb..662b8b0 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -46,7 +46,7 @@
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -91,19 +91,19 @@
TaskToolbox toolbox
) throws Exception
{
- // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
- // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
- // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> records = new ArrayList<>();
try {
- records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
+ return recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
+ //
+ // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
+ // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
+ // that has not been written yet (which is totally legitimate). So let's wait for it to show up
+ //
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, toolbox);
+ return Collections.emptyList();
}
-
- return records;
}
@Override