eliminate extra object instantiation (#11345)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index e85e6fb..662b8b0 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -46,7 +46,7 @@
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -91,19 +91,19 @@
       TaskToolbox toolbox
   ) throws Exception
   {
-    // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
-    // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
-    // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> records = new ArrayList<>();
     try {
-      records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
+      return recordSupplier.poll(task.getIOConfig().getPollTimeout());
     }
     catch (OffsetOutOfRangeException e) {
+      //
+      // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
+      // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
+      // that has not been written yet (which is totally legitimate). So let's wait for it to show up
+      //
       log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
       possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, toolbox);
+      return Collections.emptyList();
     }
-
-    return records;
   }
 
   @Override