More structured way to handle parse exceptions (#10336)

* More structured way to handle parse exceptions

* checkstyle; add more tests

* forbidden api; test

* address comment; new test

* address review comments

* javadoc for parseException; remove redundant parseException in streaming ingestion

* fix tests

* unnecessary catch

* unused imports

* appenderator test

* unused import
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index 5efe00e..f7a690f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -229,7 +229,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index ef4ca7b..47b5317 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -226,7 +226,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(metrics)
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index d05da5e..1921b43 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -415,7 +415,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index 0da6c06..cc8d4a3 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -129,7 +129,6 @@
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(aggs)
         .setDeserializeComplexMetrics(false)
-        .setReportParseExceptions(false)
         .setMaxRowCount(MAX_ROWS)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index bf7733d..fabb86a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -310,7 +310,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index 1a9ac01..a49e122 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -130,7 +130,6 @@
                 .withRollup(rollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 02c6efd..8448358 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -111,7 +111,6 @@
                 .withRollup(rollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment * 2)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 80e9643..b677a96 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -218,7 +218,6 @@
                 .withRollup(rollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 5b1a0ca..755947d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -158,7 +158,6 @@
                 .withRollup(rollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 67d6273..f212fe3 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -584,7 +584,6 @@
                 .withRollup(withRollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index 410b13e..b543076 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -321,7 +321,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index d8ae55a..680a179 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -388,7 +388,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index 931af6c..875192a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -312,7 +312,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index bbafeff..b290f30 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -294,7 +294,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index 3272ecd..406e627 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -406,7 +406,6 @@
   {
     return new IncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
         .buildOnheap();
   }
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index f7c3443..8436ad8 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -21,15 +21,17 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -67,7 +69,7 @@
     return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap);
   }
 
-  public static InputRow parse(
+  private static InputRow parse(
       TimestampSpec timestampSpec,
       DimensionsSpec dimensionsSpec,
       Map<String, Object> theMap
@@ -76,7 +78,8 @@
     return parse(timestampSpec, dimensionsSpec.getDimensionNames(), dimensionsSpec.getDimensionExclusions(), theMap);
   }
 
-  public static InputRow parse(
+  @VisibleForTesting
+  static InputRow parse(
       TimestampSpec timestampSpec,
       List<String> dimensions,
       Set<String> dimensionExclusions,
@@ -93,23 +96,42 @@
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(
+          e,
+          "Timestamp[%s] is unparseable! Event: %s",
+          timestampSpec.getRawTimestamp(theMap),
+          rawMapToPrint(theMap)
+      );
     }
-
+    if (timestamp == null) {
+      throw new ParseException(
+          "Timestamp[%s] is unparseable! Event: %s",
+          timestampSpec.getRawTimestamp(theMap),
+          rawMapToPrint(theMap)
+      );
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {
+      throw new ParseException(
+          "Encountered row with timestamp[%s] that cannot be represented as a long: [%s]",
+          timestamp,
+          rawMapToPrint(theMap)
+      );
+    }
     return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
   }
 
+  @Nullable
+  private static String rawMapToPrint(@Nullable Map<String, Object> rawMap)
+  {
+    if (rawMap == null) {
+      return null;
+    }
+    final String input = rawMap.toString();
+    return input.length() < 100 ? input : input.substring(0, 100) + "...";
+  }
+
   @JsonProperty
   @Override
   public ParseSpec getParseSpec()
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
index 08db775..cdd6ebd 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
@@ -90,12 +90,20 @@
     return missingValue;
   }
 
-  public DateTime extractTimestamp(Map<String, Object> input)
+  @Nullable
+  public DateTime extractTimestamp(@Nullable Map<String, Object> input)
   {
-    return parseDateTime(input.get(timestampColumn));
+    return parseDateTime(getRawTimestamp(input));
   }
 
-  public DateTime parseDateTime(Object input)
+  @Nullable
+  public Object getRawTimestamp(@Nullable Map<String, Object> input)
+  {
+    return input == null ? null : input.get(timestampColumn);
+  }
+
+  @Nullable
+  public DateTime parseDateTime(@Nullable Object input)
   {
     DateTime extracted = missingValue;
     if (input != null) {
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
index e71121e..01d118e 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
@@ -22,28 +22,42 @@
 import org.apache.druid.java.util.common.StringUtils;
 
 /**
+ * ParseException can be thrown on both ingestion side and query side.
+ *
+ * During ingestion, ParseException can be thrown in two places, i.e., {@code InputSourceReader#read()}
+ * and {@code IncrementalIndex#addToFacts()}. To easily handle ParseExceptions, consider using
+ * {@code FilteringCloseableInputRowIterator} and {@code ParseExceptionHandler} to iterate input rows and
+ * to add rows to IncrementalIndex, respectively.
+ *
+ * When you use {@code InputSourceReader#sample()}, the ParseException will not be thrown, but be stored in
+ * {@code InputRowListPlusRawValues}.
+ *
+ * During query, ParseException can be thrown in SQL planning. It should be never thrown once a query plan is
+ * constructed.
  */
 public class ParseException extends RuntimeException
 {
-  private boolean fromPartiallyValidRow = false;
+  private final boolean fromPartiallyValidRow;
 
   public ParseException(String formatText, Object... arguments)
   {
     super(StringUtils.nonStrictFormat(formatText, arguments));
+    this.fromPartiallyValidRow = false;
+  }
+
+  public ParseException(boolean fromPartiallyValidRow, String formatText, Object... arguments)
+  {
+    super(StringUtils.nonStrictFormat(formatText, arguments));
+    this.fromPartiallyValidRow = fromPartiallyValidRow;
   }
 
   public ParseException(Throwable cause, String formatText, Object... arguments)
   {
-    super(StringUtils.nonStrictFormat(formatText, arguments), cause);
+    this(false, StringUtils.nonStrictFormat(formatText, arguments), cause);
   }
 
   public boolean isFromPartiallyValidRow()
   {
     return fromPartiallyValidRow;
   }
-
-  public void setFromPartiallyValidRow(boolean fromPartiallyValidRow)
-  {
-    this.fromPartiallyValidRow = fromPartiallyValidRow;
-  }
 }
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java b/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java
new file mode 100644
index 0000000..6a7d11b
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.List;
+import java.util.Set;
+
+public class MapInputRowParserTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final TimestampSpec timestampSpec = new TimestampSpec("time", null, null);
+  private final List<String> dimensions = ImmutableList.of("dim");
+  private final Set<String> dimensionExclusions = ImmutableSet.of();
+
+  @Test
+  public void testParseValidInput()
+  {
+    final InputRow inputRow = MapInputRowParser.parse(
+        timestampSpec,
+        dimensions,
+        dimensionExclusions,
+        ImmutableMap.of("time", "2020-01-01", "dim", 0, "met", 10)
+    );
+    Assert.assertEquals(dimensions, inputRow.getDimensions());
+    Assert.assertEquals(DateTimes.of("2020-01-01"), inputRow.getTimestamp());
+    Assert.assertEquals(ImmutableList.of("0"), inputRow.getDimension("dim"));
+    Assert.assertEquals(10, inputRow.getMetric("met"));
+  }
+
+  @Test
+  public void testParseInvalidTimestampThrowParseException()
+  {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("Timestamp[invalid timestamp] is unparseable!");
+    final InputRow inputRow = MapInputRowParser.parse(
+        timestampSpec,
+        dimensions,
+        dimensionExclusions,
+        ImmutableMap.of("time", "invalid timestamp", "dim", 0, "met", 10)
+    );
+  }
+
+  @Test
+  public void testParseMissingTimestampThrowParseException()
+  {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("Timestamp[null] is unparseable!");
+    final InputRow inputRow = MapInputRowParser.parse(
+        timestampSpec,
+        dimensions,
+        dimensionExclusions,
+        ImmutableMap.of("dim", 0, "met", 10)
+    );
+  }
+
+  @Test
+  public void testParseTimestampSmallerThanMinThrowParseException()
+  {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("Encountered row with timestamp[-146136543-09-08T08:23:32.095Z] that cannot be represented as a long");
+    MapInputRowParser.parse(
+        timestampSpec,
+        dimensions,
+        dimensionExclusions,
+        ImmutableMap.of("time", DateTimes.utc(JodaUtils.MIN_INSTANT - 1), "dim", 0, "met", 10)
+    );
+  }
+
+  @Test
+  public void testParseTimestampLargerThanMaxThrowParseException()
+  {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("Encountered row with timestamp[146140482-04-24T15:36:27.904Z] that cannot be represented as a long");
+    MapInputRowParser.parse(
+        timestampSpec,
+        dimensions,
+        dimensionExclusions,
+        ImmutableMap.of("time", DateTimes.utc(JodaUtils.MAX_INSTANT + 1), "dim", 0, "met", 10)
+    );
+  }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 39e82c1..cf69872 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -115,8 +115,7 @@
   {
     return Collections.singletonList(
         MapInputRowParser.parse(
-            inputRowSchema.getTimestampSpec(),
-            inputRowSchema.getDimensionsSpec(),
+            inputRowSchema,
             recordFlattener.flatten(intermediateRow)
         )
     );
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index f02b7c4..b65910e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -36,7 +36,6 @@
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.utils.CircularBuffer;
 import org.apache.druid.utils.CollectionUtils;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.TopicPartition;
@@ -66,7 +65,6 @@
       KafkaIndexTask task,
       @Nullable InputRowParser<ByteBuffer> parser,
       AuthorizerMapper authorizerMapper,
-      CircularBuffer<Throwable> savedParseExceptions,
       LockGranularity lockGranularityToUse
   )
   {
@@ -74,7 +72,6 @@
         task,
         parser,
         authorizerMapper,
-        savedParseExceptions,
         lockGranularityToUse
     );
     this.task = task;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 48b2846..fb06353 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -128,7 +128,6 @@
         this,
         dataSchema.getParser(),
         authorizerMapper,
-        savedParseExceptions,
         lockGranularityToUse
     );
   }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 1a72ef4..021d916 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,7 +24,6 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -53,6 +52,7 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.joda.time.DateTime;
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 5d1f4bf..55f69e1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,7 +24,6 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -33,6 +32,7 @@
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a69dbec..f4098fc 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -53,9 +53,6 @@
 import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@@ -106,6 +103,9 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -1362,7 +1362,7 @@
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long,]",
             "Unable to parse row [unparseable2]",
             "Unable to parse row [unparseable]",
-            "Encountered row with timestamp that cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+            "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
         )
     );
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index e8f1e65..3768ceb 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -245,7 +245,7 @@
             .build(),
         null,
         true,
-        "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
     ), it.next());
     Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
         null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 40ec97a..7874a11 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -33,6 +32,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.junit.Assert;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index b72461d..905abd1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -38,7 +38,6 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
@@ -77,6 +76,7 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 753363d..0fe5701 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -70,7 +70,6 @@
         this,
         dataSchema.getParser(),
         authorizerMapper,
-        savedParseExceptions,
         lockGranularityToUse
     );
   }
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index ca35b89..2c65cd0 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -36,7 +36,6 @@
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.utils.CircularBuffer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -60,7 +59,6 @@
       KinesisIndexTask task,
       @Nullable InputRowParser<ByteBuffer> parser,
       AuthorizerMapper authorizerMapper,
-      CircularBuffer<Throwable> savedParseExceptions,
       LockGranularity lockGranularityToUse
   )
   {
@@ -68,7 +66,6 @@
         task,
         parser,
         authorizerMapper,
-        savedParseExceptions,
         lockGranularityToUse
     );
     this.task = task;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index d21e2fb..ba5129a 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,7 +25,6 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@@ -53,6 +52,7 @@
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.joda.time.DateTime;
 
 import java.util.ArrayList;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 3092d3a..f210ca6 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -26,7 +26,6 @@
 import com.google.inject.name.Named;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
 import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -36,6 +35,7 @@
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index f754656..9ab058c 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -26,12 +26,12 @@
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.initialization.Initialization;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 8a05464..4aff50b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -57,8 +57,6 @@
 import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -98,12 +96,12 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -179,9 +177,7 @@
   private final Period intermediateHandoffPeriod = null;
   private int maxRecordsPerPoll;
 
-  private AppenderatorsManager appenderatorsManager;
   private final Set<Integer> checkpointRequestsHash = new HashSet<>();
-  private RowIngestionMetersFactory rowIngestionMetersFactory;
 
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
@@ -226,8 +222,6 @@
 
     recordSupplier = mock(KinesisRecordSupplier.class);
 
-    appenderatorsManager = new TestAppenderatorsManager();
-
     // sleep required because of kinesalite
     Thread.sleep(500);
     makeToolboxFactory();
@@ -1336,10 +1330,10 @@
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1],]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float,]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long,]",
-            "Unparseable timestamp found! Event: {}",
+            "Timestamp[null] is unparseable! Event: {}",
             "Unable to parse row [unparseable2]",
             "Unable to parse row [unparseable]",
-            "Encountered row with timestamp that cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+            "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
         )
     );
 
@@ -2838,7 +2832,6 @@
   {
     directory = tempFolder.newFolder();
     final TestUtils testUtils = new TestUtils();
-    rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
     final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
     objectMapper.setInjectableValues(((InjectableValues.Std) objectMapper.getInjectableValues()).addValue(
         AWSCredentialsConfig.class,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 1b2c07a..dadc6c7 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -262,7 +262,7 @@
             .build(),
         null,
         true,
-        "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
     ), it.next());
     Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
         null,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 7ca3b04..1351662 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -36,7 +36,6 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@@ -76,6 +75,7 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index e9b6817..421bbed 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -147,8 +147,7 @@
   {
     return Collections.singletonList(
         MapInputRowParser.parse(
-            inputRowSchema.getTimestampSpec(),
-            inputRowSchema.getDimensionsSpec(),
+            inputRowSchema,
             orcStructFlattener.flatten(intermediateRow)
         )
     );
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 1bdd011..2d2d87d 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -136,8 +136,7 @@
   {
     return Collections.singletonList(
         MapInputRowParser.parse(
-            inputRowSchema.getTimestampSpec(),
-            inputRowSchema.getDimensionsSpec(),
+            inputRowSchema,
             flattener.flatten(intermediateRow)
         )
     );
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index e2acce7..91982fd 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -304,7 +304,6 @@
 
     IncrementalIndex newIndex = new IncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
-        .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex
         .setMaxRowCount(tuningConfig.getRowFlushBoundary())
         .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
         .buildOnheap();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
index f255a33..03452ac 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
@@ -21,10 +21,10 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 009a407..5f6f392 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -39,7 +39,6 @@
 import org.apache.druid.indexing.common.actions.SegmentInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -50,6 +49,7 @@
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentArchiver;
 import org.apache.druid.segment.loading.DataSegmentKiller;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 3f8063f..8c883a4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -37,7 +37,6 @@
 import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
@@ -48,6 +47,7 @@
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentArchiver;
 import org.apache.druid.segment.loading.DataSegmentKiller;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
index f47a8f4..318f1c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
@@ -21,17 +21,18 @@
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class DropwizardRowIngestionMeters implements RowIngestionMeters
 {
-  public static final String ONE_MINUTE_NAME = "1m";
-  public static final String FIVE_MINUTE_NAME = "5m";
-  public static final String FIFTEEN_MINUTE_NAME = "15m";
+  private static final String ONE_MINUTE_NAME = "1m";
+  private static final String FIVE_MINUTE_NAME = "5m";
+  private static final String FIFTEEN_MINUTE_NAME = "15m";
 
-  private final MetricRegistry metricRegistry;
   private final Meter processed;
   private final Meter processedWithError;
   private final Meter unparseable;
@@ -39,7 +40,7 @@
 
   public DropwizardRowIngestionMeters()
   {
-    this.metricRegistry = new MetricRegistry();
+    MetricRegistry metricRegistry = new MetricRegistry();
     this.processed = metricRegistry.meter(PROCESSED);
     this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
     this.unparseable = metricRegistry.meter(UNPARSEABLE);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
index d6b9928..46b1e12 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.indexing.common.stats;
 
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+
 public class DropwizardRowIngestionMetersFactory implements RowIngestionMetersFactory
 {
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index ed1cff0..070d21b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -26,6 +26,8 @@
 import org.apache.druid.java.util.metrics.AbstractMonitor;
 import org.apache.druid.java.util.metrics.MonitorUtils;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index eefda5a..f8596ec 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -19,10 +19,16 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
@@ -41,6 +47,10 @@
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -52,17 +62,21 @@
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -136,6 +150,55 @@
   }
 
   /**
+   * Returns an {@link InputRow} iterator which iterates over an input source.
+   * The returned iterator filters out rows which don't satisfy the given filter or cannot be parsed properly.
+   * The returned iterator can throw {@link org.apache.druid.java.util.common.parsers.ParseException}s in
+   * {@link Iterator#hasNext()} when it hits {@link ParseExceptionHandler#maxAllowedParseExceptions}.
+   */
+  public static FilteringCloseableInputRowIterator inputSourceReader(
+      File tmpDir,
+      DataSchema dataSchema,
+      InputSource inputSource,
+      @Nullable InputFormat inputFormat,
+      Predicate<InputRow> rowFilter,
+      RowIngestionMeters ingestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  ) throws IOException
+  {
+    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
+                                            .map(AggregatorFactory::getName)
+                                            .collect(Collectors.toList());
+    final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
+        inputSource.reader(
+            new InputRowSchema(
+                dataSchema.getTimestampSpec(),
+                dataSchema.getDimensionsSpec(),
+                metricsNames
+            ),
+            inputFormat,
+            tmpDir
+        )
+    );
+    return new FilteringCloseableInputRowIterator(
+        inputSourceReader.read(),
+        rowFilter,
+        ingestionMeters,
+        parseExceptionHandler
+    );
+  }
+
+  protected static Predicate<InputRow> defaultRowFilter(GranularitySpec granularitySpec)
+  {
+    return inputRow -> {
+      if (inputRow == null) {
+        return false;
+      }
+      final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+      return optInterval.isPresent();
+    };
+  }
+
+  /**
    * Registers a resource cleaner which is executed on abnormal exits.
    *
    * @see Task#stopGracefully
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 586abe0..064d2ca 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -57,7 +57,6 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -69,6 +68,8 @@
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
@@ -86,7 +87,7 @@
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.apache.druid.utils.CircularBuffer;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -151,21 +152,26 @@
   private volatile Thread runThread = null;
 
   @JsonIgnore
-  private CircularBuffer<Throwable> savedParseExceptions;
-
-  @JsonIgnore
   private final LockGranularity lockGranularity;
 
   @JsonIgnore
+  @MonotonicNonNull
+  private ParseExceptionHandler parseExceptionHandler;
+
+  @JsonIgnore
+  @MonotonicNonNull
   private IngestionState ingestionState;
 
   @JsonIgnore
+  @MonotonicNonNull
   private AuthorizerMapper authorizerMapper;
 
   @JsonIgnore
+  @MonotonicNonNull
   private RowIngestionMeters rowIngestionMeters;
 
   @JsonIgnore
+  @MonotonicNonNull
   private String errorMsg;
 
 
@@ -187,10 +193,6 @@
     this.spec = spec;
     this.pendingHandoffs = new ConcurrentLinkedQueue<>();
 
-    if (spec.getTuningConfig().getMaxSavedParseExceptions() > 0) {
-      savedParseExceptions = new CircularBuffer<>(spec.getTuningConfig().getMaxSavedParseExceptions());
-    }
-
     this.ingestionState = IngestionState.NOT_STARTED;
     this.lockGranularity = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
                            ? LockGranularity.TIME_CHUNK
@@ -244,6 +246,12 @@
     runThread = Thread.currentThread();
     authorizerMapper = toolbox.getAuthorizerMapper();
     rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        spec.getTuningConfig().isLogParseExceptions(),
+        spec.getTuningConfig().getMaxParseExceptions(),
+        spec.getTuningConfig().getMaxSavedParseExceptions()
+    );
 
     setupTimeoutAlert();
 
@@ -371,12 +379,6 @@
               // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
               throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp());
             }
-
-            if (addResult.getParseException() != null) {
-              handleParseException(addResult.getParseException());
-            } else {
-              rowIngestionMeters.incrementProcessed();
-            }
           }
         }
         catch (ParseException e) {
@@ -550,7 +552,9 @@
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
-    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()
+    );
     return Response.ok(events).build();
   }
 
@@ -590,7 +594,8 @@
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
-        savedParseExceptions);
+        parseExceptionHandler.getSavedParseExceptions()
+    );
     if (buildSegmentsParseExceptionMessages != null) {
       unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
     }
@@ -619,8 +624,8 @@
       log.error(pe, "Encountered parse exception");
     }
 
-    if (savedParseExceptions != null) {
-      savedParseExceptions.add(pe);
+    if (parseExceptionHandler.getSavedParseExceptions() != null) {
+      parseExceptionHandler.getSavedParseExceptions().add(pe);
     }
 
     if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
@@ -760,7 +765,9 @@
         toolbox.getJoinableFactory(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
-        toolbox.getCachePopulatorStats()
+        toolbox.getCachePopulatorStats(),
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index c6b756a..711f9d4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -21,6 +21,8 @@
 
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -38,7 +40,9 @@
       FireDepartmentMetrics metrics,
       TaskToolbox toolbox,
       DataSchema dataSchema,
-      AppenderatorConfig appenderatorConfig
+      AppenderatorConfig appenderatorConfig,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     return newAppenderator(
@@ -48,7 +52,9 @@
         toolbox,
         dataSchema,
         appenderatorConfig,
-        toolbox.getSegmentPusher()
+        toolbox.getSegmentPusher(),
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
@@ -59,7 +65,9 @@
       TaskToolbox toolbox,
       DataSchema dataSchema,
       AppenderatorConfig appenderatorConfig,
-      DataSegmentPusher segmentPusher
+      DataSegmentPusher segmentPusher,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     return appenderatorsManager.createOfflineAppenderatorForTask(
@@ -70,7 +78,9 @@
         segmentPusher,
         toolbox.getJsonMapper(),
         toolbox.getIndexIO(),
-        toolbox.getIndexMergerV9()
+        toolbox.getIndexMergerV9(),
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
new file mode 100644
index 0000000..6a88a34
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+
+/**
+ * An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter out rows which do not satisfy the given
+ * {@link #filter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
+ * it filters out rows based on the filter. ParseException handling is delegatged to {@link ParseExceptionHandler}.
+ */
+public class FilteringCloseableInputRowIterator implements CloseableIterator<InputRow>
+{
+  private final CloseableIterator<InputRow> delegate;
+  private final Predicate<InputRow> filter;
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private InputRow next;
+
+  public FilteringCloseableInputRowIterator(
+      CloseableIterator<InputRow> delegate,
+      Predicate<InputRow> filter,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
+  {
+    this.delegate = delegate;
+    this.filter = filter;
+    this.rowIngestionMeters = rowIngestionMeters;
+    this.parseExceptionHandler = parseExceptionHandler;
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    while (next == null && delegate.hasNext()) {
+      try {
+        // delegate.next() can throw ParseException
+        final InputRow row = delegate.next();
+        // filter.test() can throw ParseException
+        if (filter.test(row)) {
+          next = row;
+        } else {
+          rowIngestionMeters.incrementThrownAway();
+        }
+      }
+      catch (ParseException e) {
+        parseExceptionHandler.handle(e);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public InputRow next()
+  {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    final InputRow row = next;
+    next = null;
+    return row;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    delegate.close();
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 576b7de..e24df91 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -49,12 +49,12 @@
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index e70a8ca..5e93675 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -37,9 +37,7 @@
 import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.Rows;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.hll.HyperLogLogCollector;
@@ -58,7 +56,6 @@
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
 import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
@@ -69,7 +66,6 @@
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
@@ -77,9 +73,9 @@
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.BatchIOConfig;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.IngestionSpec;
@@ -102,7 +98,6 @@
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.utils.CircularBuffer;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -120,7 +115,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -133,7 +127,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
-import java.util.stream.Collectors;
+import java.util.function.Predicate;
 
 public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 {
@@ -161,9 +155,11 @@
 
   private IngestionState ingestionState;
 
-  private final CircularBuffer<Throwable> buildSegmentsSavedParseExceptions;
+  @MonotonicNonNull
+  private ParseExceptionHandler determinePartitionsParseExceptionHandler;
 
-  private final CircularBuffer<Throwable> determinePartitionsSavedParseExceptions;
+  @MonotonicNonNull
+  private ParseExceptionHandler buildSegmentsParseExceptionHandler;
 
   @MonotonicNonNull
   private AuthorizerMapper authorizerMapper;
@@ -213,18 +209,6 @@
         context
     );
     this.ingestionSchema = ingestionSchema;
-    if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) {
-      determinePartitionsSavedParseExceptions = new CircularBuffer<>(
-          ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
-      );
-
-      buildSegmentsSavedParseExceptions = new CircularBuffer<>(
-          ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
-      );
-    } else {
-      determinePartitionsSavedParseExceptions = null;
-      buildSegmentsSavedParseExceptions = null;
-    }
     this.ingestionState = IngestionState.NOT_STARTED;
   }
 
@@ -318,14 +302,18 @@
     if (needsDeterminePartitions) {
       events.put(
           RowIngestionMeters.DETERMINE_PARTITIONS,
-          IndexTaskUtils.getMessagesFromSavedParseExceptions(determinePartitionsSavedParseExceptions)
+          IndexTaskUtils.getMessagesFromSavedParseExceptions(
+              determinePartitionsParseExceptionHandler.getSavedParseExceptions()
+          )
       );
     }
 
     if (needsBuildSegments) {
       events.put(
           RowIngestionMeters.BUILD_SEGMENTS,
-          IndexTaskUtils.getMessagesFromSavedParseExceptions(buildSegmentsSavedParseExceptions)
+          IndexTaskUtils.getMessagesFromSavedParseExceptions(
+              buildSegmentsParseExceptionHandler.getSavedParseExceptions()
+          )
       );
     }
 
@@ -448,6 +436,18 @@
       this.authorizerMapper = toolbox.getAuthorizerMapper();
       this.determinePartitionsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
       this.buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+      this.determinePartitionsParseExceptionHandler = new ParseExceptionHandler(
+          determinePartitionsMeters,
+          ingestionSchema.getTuningConfig().isLogParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
+      );
+      this.buildSegmentsParseExceptionHandler = new ParseExceptionHandler(
+          buildSegmentsMeters,
+          ingestionSchema.getTuningConfig().isLogParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
+      );
 
       final boolean determineIntervals = !ingestionSchema.getDataSchema()
                                                          .getGranularitySpec()
@@ -530,9 +530,11 @@
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     List<String> determinePartitionsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
-        determinePartitionsSavedParseExceptions);
+        determinePartitionsParseExceptionHandler.getSavedParseExceptions()
+    );
     List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
-        buildSegmentsSavedParseExceptions);
+        buildSegmentsParseExceptionHandler.getSavedParseExceptions()
+    );
 
     if (determinePartitionsParseExceptionMessages != null || buildSegmentsParseExceptionMessages != null) {
       unparseableEventsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsParseExceptionMessages);
@@ -709,82 +711,54 @@
         Comparators.intervalsByStartThenEnd()
     );
     final Granularity queryGranularity = granularitySpec.getQueryGranularity();
-    final List<String> metricsNames = Arrays.stream(ingestionSchema.getDataSchema().getAggregators())
-                                            .map(AggregatorFactory::getName)
-                                            .collect(Collectors.toList());
-    final InputSourceReader inputSourceReader = ingestionSchema.getDataSchema().getTransformSpec().decorate(
-        inputSource.reader(
-            new InputRowSchema(
-                ingestionSchema.getDataSchema().getTimestampSpec(),
-                ingestionSchema.getDataSchema().getDimensionsSpec(),
-                metricsNames
-            ),
-            inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
-            tmpDir
-        )
-    );
+    final Predicate<InputRow> rowFilter = inputRow -> {
+      if (inputRow == null) {
+        return false;
+      }
+      if (determineIntervals) {
+        return true;
+      }
+      final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+      return optInterval.isPresent();
+    };
 
-    try (final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read()) {
+    try (final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+        tmpDir,
+        ingestionSchema.getDataSchema(),
+        inputSource,
+        inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
+        rowFilter,
+        determinePartitionsMeters,
+        determinePartitionsParseExceptionHandler
+    )) {
       while (inputRowIterator.hasNext()) {
-        try {
-          final InputRow inputRow = inputRowIterator.next();
+        final InputRow inputRow = inputRowIterator.next();
 
-          // The null inputRow means the caller must skip this row.
-          if (inputRow == null) {
-            determinePartitionsMeters.incrementThrownAway();
-            continue;
-          }
-
-          final Interval interval;
-          if (determineIntervals) {
-            interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
-          } else {
-            if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
-              final String errorMsg = StringUtils.format(
-                  "Encountered row with timestamp that cannot be represented as a long: [%s]",
-                  inputRow
-              );
-              throw new ParseException(errorMsg);
-            }
-
-            final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
-            if (!optInterval.isPresent()) {
-              determinePartitionsMeters.incrementThrownAway();
-              continue;
-            }
-            interval = optInterval.get();
-          }
-
-          if (partitionsSpec.needsDeterminePartitions(false)) {
-            hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
-
-            List<Object> groupKey = Rows.toGroupKey(
-                queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
-                inputRow
-            );
-            hllCollectors.get(interval).get()
-                         .add(HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
-          } else {
-            // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
-            // for the interval and don't instantiate a HLL collector
-            hllCollectors.putIfAbsent(interval, Optional.absent());
-          }
-          determinePartitionsMeters.incrementProcessed();
+        final Interval interval;
+        if (determineIntervals) {
+          interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+        } else {
+          final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+          // this interval must exist since it passed the rowFilter
+          assert optInterval.isPresent();
+          interval = optInterval.get();
         }
-        catch (ParseException e) {
-          if (ingestionSchema.getTuningConfig().isLogParseExceptions()) {
-            log.error(e, "Encountered parse exception");
-          }
 
-          if (determinePartitionsSavedParseExceptions != null) {
-            determinePartitionsSavedParseExceptions.add(e);
-          }
+        if (partitionsSpec.needsDeterminePartitions(false)) {
+          hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
 
-          determinePartitionsMeters.incrementUnparseable();
-          if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig().getMaxParseExceptions()) {
-            throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
-          }
+          List<Object> groupKey = Rows.toGroupKey(
+              queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
+              inputRow
+          );
+          hllCollectors.get(interval).get()
+                       .add(HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
+        } else {
+          // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
+          // for the interval and don't instantiate a HLL collector
+          hllCollectors.putIfAbsent(interval, Optional.absent());
         }
+        determinePartitionsMeters.incrementProcessed();
       }
     }
 
@@ -889,28 +863,26 @@
         buildSegmentsFireDepartmentMetrics,
         toolbox,
         dataSchema,
-        tuningConfig
+        tuningConfig,
+        buildSegmentsMeters,
+        buildSegmentsParseExceptionHandler
     );
     boolean exceptionOccurred = false;
     try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
       driver.startJob();
 
-      final InputSourceProcessor inputSourceProcessor = new InputSourceProcessor(
-          buildSegmentsMeters,
-          buildSegmentsSavedParseExceptions,
-          tuningConfig.isLogParseExceptions(),
-          tuningConfig.getMaxParseExceptions(),
-          pushTimeout,
-          new DefaultIndexTaskInputRowIteratorBuilder()
-      );
-      inputSourceProcessor.process(
+      InputSourceProcessor.process(
           dataSchema,
           driver,
           partitionsSpec,
           inputSource,
           inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
           tmpDir,
-          sequenceNameFunction
+          sequenceNameFunction,
+          new DefaultIndexTaskInputRowIteratorBuilder(),
+          buildSegmentsMeters,
+          buildSegmentsParseExceptionHandler,
+          pushTimeout
       );
 
       // If we use timeChunk lock, then we don't have to specify what segments will be overwritten because
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 740ca20..abc6b92 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task;
 
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.security.Access;
@@ -40,7 +41,7 @@
 public class IndexTaskUtils
 {
   @Nullable
-  public static List<String> getMessagesFromSavedParseExceptions(CircularBuffer<Throwable> savedParseExceptions)
+  public static List<String> getMessagesFromSavedParseExceptions(CircularBuffer<ParseException> savedParseExceptions)
   {
     if (savedParseExceptions == null) {
       return null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
index e88dab2..05ac79e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
@@ -23,64 +23,33 @@
 import org.apache.druid.data.input.HandlingInputRowIterator;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.utils.CircularBuffer;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 public class InputSourceProcessor
 {
   private static final Logger LOG = new Logger(InputSourceProcessor.class);
 
-  private final RowIngestionMeters buildSegmentsMeters;
-  @Nullable
-  private final CircularBuffer<Throwable> buildSegmentsSavedParseExceptions;
-  private final boolean logParseExceptions;
-  private final int maxParseExceptions;
-  private final long pushTimeout;
-  private final IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder;
-
-  public InputSourceProcessor(
-      RowIngestionMeters buildSegmentsMeters,
-      @Nullable CircularBuffer<Throwable> buildSegmentsSavedParseExceptions,
-      boolean logParseExceptions,
-      int maxParseExceptions,
-      long pushTimeout,
-      IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder
-  )
-  {
-    this.buildSegmentsMeters = buildSegmentsMeters;
-    this.buildSegmentsSavedParseExceptions = buildSegmentsSavedParseExceptions;
-    this.logParseExceptions = logParseExceptions;
-    this.maxParseExceptions = maxParseExceptions;
-    this.pushTimeout = pushTimeout;
-    this.inputRowIteratorBuilder = inputRowIteratorBuilder;
-  }
-
   /**
    * This method opens the given {@link InputSource} and processes data via {@link InputSourceReader}.
    * All read data is consumed by {@link BatchAppenderatorDriver} which creates new segments.
@@ -88,14 +57,18 @@
    *
    * @return {@link SegmentsAndCommitMetadata} for the pushed segments.
    */
-  public SegmentsAndCommitMetadata process(
+  public static SegmentsAndCommitMetadata process(
       DataSchema dataSchema,
       BatchAppenderatorDriver driver,
       PartitionsSpec partitionsSpec,
       InputSource inputSource,
       @Nullable InputFormat inputFormat,
       File tmpDir,
-      SequenceNameFunction sequenceNameFunction
+      SequenceNameFunction sequenceNameFunction,
+      IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder,
+      RowIngestionMeters buildSegmentsMeters,
+      ParseExceptionHandler parseExceptionHandler,
+      long pushTimeout
   ) throws IOException, InterruptedException, ExecutionException, TimeoutException
   {
     @Nullable
@@ -103,73 +76,53 @@
                                                         ? (DynamicPartitionsSpec) partitionsSpec
                                                         : null;
     final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
-
-    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
-                                            .map(AggregatorFactory::getName)
-                                            .collect(Collectors.toList());
-    final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
-        inputSource.reader(
-            new InputRowSchema(
-                dataSchema.getTimestampSpec(),
-                dataSchema.getDimensionsSpec(),
-                metricsNames
-            ),
-            inputFormat,
-            tmpDir
-        )
-    );
     try (
-        final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read();
-        HandlingInputRowIterator iterator = inputRowIteratorBuilder
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            tmpDir,
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        final HandlingInputRowIterator iterator = inputRowIteratorBuilder
             .delegate(inputRowIterator)
             .granularitySpec(granularitySpec)
-            .nullRowRunnable(buildSegmentsMeters::incrementThrownAway)
-            .absentBucketIntervalConsumer(inputRow -> buildSegmentsMeters.incrementThrownAway())
             .build()
     ) {
       while (iterator.hasNext()) {
-        try {
-          final InputRow inputRow = iterator.next();
-          if (inputRow == null) {
-            continue;
-          }
-
-          // IndexTaskInputRowIteratorBuilder.absentBucketIntervalConsumer() ensures the interval will be present here
-          Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
-          @SuppressWarnings("OptionalGetWithoutIsPresent")
-          final Interval interval = optInterval.get();
-
-          final String sequenceName = sequenceNameFunction.getSequenceName(interval, inputRow);
-          final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
-
-          if (addResult.isOk()) {
-            // incremental segment publishment is allowed only when rollup doesn't have to be perfect.
-            if (dynamicPartitionsSpec != null) {
-              final boolean isPushRequired = addResult.isPushRequired(
-                  dynamicPartitionsSpec.getMaxRowsPerSegment(),
-                  dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
-              );
-              if (isPushRequired) {
-                // There can be some segments waiting for being pushed even though no more rows will be added to them
-                // in the future.
-                // If those segments are not pushed here, the remaining available space in appenderator will be kept
-                // small which could lead to smaller segments.
-                final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
-                LOG.debugSegments(pushed.getSegments(), "Pushed segments");
-              }
-            }
-          } else {
-            throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
-          }
-
-          if (addResult.getParseException() != null) {
-            handleParseException(addResult.getParseException());
-          } else {
-            buildSegmentsMeters.incrementProcessed();
-          }
+        final InputRow inputRow = iterator.next();
+        if (inputRow == null) {
+          continue;
         }
-        catch (ParseException e) {
-          handleParseException(e);
+
+        // IndexTaskInputRowIteratorBuilder.absentBucketIntervalConsumer() ensures the interval will be present here
+        Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+        @SuppressWarnings("OptionalGetWithoutIsPresent")
+        final Interval interval = optInterval.get();
+
+        final String sequenceName = sequenceNameFunction.getSequenceName(interval, inputRow);
+        final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
+
+        if (addResult.isOk()) {
+          // incremental segment publishment is allowed only when rollup doesn't have to be perfect.
+          if (dynamicPartitionsSpec != null) {
+            final boolean isPushRequired = addResult.isPushRequired(
+                dynamicPartitionsSpec.getMaxRowsPerSegment(),
+                dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
+            );
+            if (isPushRequired) {
+              // There can be some segments waiting for being pushed even though no more rows will be added to them
+              // in the future.
+              // If those segments are not pushed here, the remaining available space in appenderator will be kept
+              // small which could lead to smaller segments.
+              final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
+              LOG.debugSegments(pushed.getSegments(), "Pushed segments");
+            }
+          }
+        } else {
+          throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
         }
       }
 
@@ -179,26 +132,4 @@
       return pushed;
     }
   }
-
-  private void handleParseException(ParseException e)
-  {
-    if (e.isFromPartiallyValidRow()) {
-      buildSegmentsMeters.incrementProcessedWithError();
-    } else {
-      buildSegmentsMeters.incrementUnparseable();
-    }
-
-    if (logParseExceptions) {
-      LOG.error(e, "Encountered parse exception");
-    }
-
-    if (buildSegmentsSavedParseExceptions != null) {
-      buildSegmentsSavedParseExceptions.add(e);
-    }
-
-    if (buildSegmentsMeters.getUnparseable() + buildSegmentsMeters.getProcessedWithError() > maxParseExceptions) {
-      LOG.error("Max parse exceptions exceeded, terminating task...");
-      throw new RuntimeException("Max parse exceptions exceeded, terminating task...", e);
-    }
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index d71102b..53019dd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -29,37 +29,33 @@
 import org.apache.druid.data.input.HandlingInputRowIterator;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.Rows;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
-import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
 import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 /**
  * The worker task of {@link PartialDimensionDistributionParallelIndexTaskRunner}. This task
@@ -189,41 +185,38 @@
     InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
         ingestionSchema.getDataSchema().getParser()
     );
-    List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
-                                      .map(AggregatorFactory::getName)
-                                      .collect(Collectors.toList());
     InputFormat inputFormat = inputSource.needsFormat()
                               ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
                               : null;
-    InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
-        inputSource.reader(
-            new InputRowSchema(
-                dataSchema.getTimestampSpec(),
-                dataSchema.getDimensionsSpec(),
-                metricsNames
-            ),
-            inputFormat,
-            toolbox.getIndexingTmpDir()
-        )
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
     );
 
     try (
-        CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read();
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
         HandlingInputRowIterator iterator =
             new RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimension, SKIP_NULL)
                 .delegate(inputRowIterator)
                 .granularitySpec(granularitySpec)
-                .nullRowRunnable(IndexTaskInputRowIteratorBuilder.NOOP_RUNNABLE)
-                .absentBucketIntervalConsumer(IndexTaskInputRowIteratorBuilder.NOOP_CONSUMER)
                 .build()
     ) {
       Map<Interval, StringDistribution> distribution = determineDistribution(
           iterator,
           granularitySpec,
           partitionDimension,
-          isAssumeGrouped,
-          tuningConfig.isLogParseExceptions(),
-          tuningConfig.getMaxParseExceptions()
+          isAssumeGrouped
       );
       sendReport(toolbox, new DimensionDistributionReport(getId(), distribution));
     }
@@ -235,9 +228,7 @@
       HandlingInputRowIterator inputRowIterator,
       GranularitySpec granularitySpec,
       String partitionDimension,
-      boolean isAssumeGrouped,
-      boolean isLogParseExceptions,
-      int maxParseExceptions
+      boolean isAssumeGrouped
   )
   {
     Map<Interval, StringDistribution> intervalToDistribution = new HashMap<>();
@@ -246,36 +237,22 @@
         ? dedupInputRowFilterSupplier.get()
         : new PassthroughInputRowFilter();
 
-    int numParseExceptions = 0;
-
     while (inputRowIterator.hasNext()) {
-      try {
-        InputRow inputRow = inputRowIterator.next();
-        if (inputRow == null) {
-          continue;
-        }
-
-        DateTime timestamp = inputRow.getTimestamp();
-
-        //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
-        Interval interval = granularitySpec.bucketInterval(timestamp).get();
-        String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
-
-        if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
-          StringDistribution stringDistribution =
-              intervalToDistribution.computeIfAbsent(interval, k -> new StringSketch());
-          stringDistribution.put(partitionDimensionValue);
-        }
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
       }
-      catch (ParseException e) {
-        if (isLogParseExceptions) {
-          LOG.error(e, "Encountered parse exception");
-        }
 
-        numParseExceptions++;
-        if (numParseExceptions > maxParseExceptions) {
-          throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
-        }
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+      String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
+
+      if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
+        StringDistribution stringDistribution =
+            intervalToDistribution.computeIfAbsent(interval, k -> new StringSketch());
+        stringDistribution.put(partitionDimensionValue);
       }
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index b65d4f2..27160c9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -23,7 +23,6 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.InputSourceProcessor;
@@ -33,6 +32,8 @@
 import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
 import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
@@ -157,6 +158,12 @@
     final SegmentAllocatorForBatch segmentAllocator = createSegmentAllocator(toolbox, taskClient);
     final SequenceNameFunction sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
 
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
     final Appenderator appenderator = BatchAppenderators.newAppenderator(
         getId(),
         toolbox.getAppenderatorsManager(),
@@ -164,28 +171,26 @@
         toolbox,
         dataSchema,
         tuningConfig,
-        new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager())
+        new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()),
+        buildSegmentsMeters,
+        parseExceptionHandler
     );
     boolean exceptionOccurred = false;
     try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
       driver.startJob();
 
-      final InputSourceProcessor inputSourceProcessor = new InputSourceProcessor(
-          buildSegmentsMeters,
-          null,
-          tuningConfig.isLogParseExceptions(),
-          tuningConfig.getMaxParseExceptions(),
-          pushTimeout,
-          inputRowIteratorBuilder
-      );
-      final SegmentsAndCommitMetadata pushed = inputSourceProcessor.process(
+      final SegmentsAndCommitMetadata pushed = InputSourceProcessor.process(
           dataSchema,
           driver,
           partitionsSpec,
           inputSource,
           inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
           tmpDir,
-          sequenceNameFunction
+          sequenceNameFunction,
+          inputRowIteratorBuilder,
+          buildSegmentsMeters,
+          parseExceptionHandler,
+          pushTimeout
       );
 
       return pushed.getSegments();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index ba1316f..543de61 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -24,9 +24,7 @@
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -39,14 +37,12 @@
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
@@ -69,7 +65,6 @@
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -77,7 +72,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 /**
  * The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task
@@ -304,95 +298,81 @@
         partitionsSpec
     );
 
+    final RowIngestionMeters rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
     final Appenderator appenderator = BatchAppenderators.newAppenderator(
         getId(),
         toolbox.getAppenderatorsManager(),
         fireDepartmentMetrics,
         toolbox,
         dataSchema,
-        tuningConfig
-    );
-    final List<String> metricsNames = Arrays.stream(ingestionSchema.getDataSchema().getAggregators())
-                                            .map(AggregatorFactory::getName)
-                                            .collect(Collectors.toList());
-    final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
-        inputSource.reader(
-            new InputRowSchema(
-                ingestionSchema.getDataSchema().getTimestampSpec(),
-                ingestionSchema.getDataSchema().getDimensionsSpec(),
-                metricsNames
-            ),
-            inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
-            tmpDir
+        tuningConfig,
+        rowIngestionMeters,
+        new ParseExceptionHandler(
+            rowIngestionMeters,
+            tuningConfig.isLogParseExceptions(),
+            tuningConfig.getMaxParseExceptions(),
+            tuningConfig.getMaxSavedParseExceptions()
         )
     );
-
     boolean exceptionOccurred = false;
     try (
         final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator);
-        final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read()
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            tmpDir,
+            dataSchema,
+            inputSource,
+            inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
+            inputRow -> {
+              if (inputRow == null) {
+                return false;
+              }
+              if (explicitIntervals) {
+                final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+                return optInterval.isPresent();
+              }
+              return true;
+            },
+            rowIngestionMeters,
+            parseExceptionHandler
+        )
     ) {
       driver.startJob();
 
       final Set<DataSegment> pushedSegments = new HashSet<>();
 
       while (inputRowIterator.hasNext()) {
-        try {
-          final InputRow inputRow = inputRowIterator.next();
+        final InputRow inputRow = inputRowIterator.next();
 
-          if (inputRow == null) {
-            fireDepartmentMetrics.incrementThrownAway();
-            continue;
+        // Segments are created as needed, using a single sequence name. They may be allocated from the overlord
+        // (in append mode) or may be created on our own authority (in overwrite mode).
+        final String sequenceName = getId();
+        final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
+
+        if (addResult.isOk()) {
+          final boolean isPushRequired = addResult.isPushRequired(
+              partitionsSpec.getMaxRowsPerSegment(),
+              partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
+          );
+          if (isPushRequired) {
+            // There can be some segments waiting for being published even though any rows won't be added to them.
+            // If those segments are not published here, the available space in appenderator will be kept to be small
+            // which makes the size of segments smaller.
+            final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
+            pushedSegments.addAll(pushed.getSegments());
+            LOG.info("Pushed [%s] segments", pushed.getSegments().size());
+            LOG.infoSegments(pushed.getSegments(), "Pushed segments");
           }
-
-          if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
-            final String errorMsg = StringUtils.format(
-                "Encountered row with timestamp that cannot be represented as a long: [%s]",
-                inputRow
-            );
-            throw new ParseException(errorMsg);
-          }
-
-          if (explicitIntervals) {
-            final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
-            if (!optInterval.isPresent()) {
-              fireDepartmentMetrics.incrementThrownAway();
-              continue;
-            }
-          }
-
-          // Segments are created as needed, using a single sequence name. They may be allocated from the overlord
-          // (in append mode) or may be created on our own authority (in overwrite mode).
-          final String sequenceName = getId();
-          final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
-
-          if (addResult.isOk()) {
-            final boolean isPushRequired = addResult.isPushRequired(
-                partitionsSpec.getMaxRowsPerSegment(),
-                partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
-            );
-            if (isPushRequired) {
-              // There can be some segments waiting for being published even though any rows won't be added to them.
-              // If those segments are not published here, the available space in appenderator will be kept to be small
-              // which makes the size of segments smaller.
-              final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
-              pushedSegments.addAll(pushed.getSegments());
-              LOG.info("Pushed [%s] segments", pushed.getSegments().size());
-              LOG.infoSegments(pushed.getSegments(), "Pushed segments");
-            }
-          } else {
-            throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
-          }
-
-          fireDepartmentMetrics.incrementProcessed();
+        } else {
+          throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
         }
-        catch (ParseException e) {
-          if (tuningConfig.isReportParseExceptions()) {
-            throw e;
-          } else {
-            fireDepartmentMetrics.incrementUnparseable();
-          }
-        }
+
+        fireDepartmentMetrics.incrementProcessed();
       }
 
       final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
index b2a9463..aa74f11 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
@@ -19,38 +19,22 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel.iterator;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.HandlingInputRowIterator;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.indexing.common.task.IndexTask;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.joda.time.Interval;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Consumer;
 
 /**
  * <pre>
  * Build a default {@link HandlingInputRowIterator} for {@link IndexTask}s. Each {@link InputRow} is
- * processed by the following handlers, in order:
- *
- *   1. Null row: If {@link InputRow} is null, invoke the null row {@link Runnable} callback.
- *
- *   2. Invalid timestamp: If {@link InputRow} has an invalid timestamp, throw a {@link ParseException}.
- *
- *   3. Absent bucket interval: If {@link InputRow} has a timestamp that does not match the
- *      {@link GranularitySpec} bucket intervals, invoke the absent bucket interval {@link Consumer}
- *      callback.
- *
- *   4. Any additional handlers in the order they are added by calls to
- *      {@link #appendInputRowHandler(HandlingInputRowIterator.InputRowHandler)}.
+ * processed by the registered handlers in the order that they are registered by calls to
+ * {@link #appendInputRowHandler(HandlingInputRowIterator.InputRowHandler)}.
  *
  * If any of the handlers invoke their respective callback, the {@link HandlingInputRowIterator} will yield
  * a null {@link InputRow} next; otherwise, the next {@link InputRow} is yielded.
@@ -62,8 +46,6 @@
 {
   private CloseableIterator<InputRow> delegate = null;
   private GranularitySpec granularitySpec = null;
-  private HandlingInputRowIterator.InputRowHandler nullRowHandler = null;
-  private HandlingInputRowIterator.InputRowHandler absentBucketIntervalHandler = null;
   private final List<HandlingInputRowIterator.InputRowHandler> appendedInputRowHandlers = new ArrayList<>();
 
   @Override
@@ -81,46 +63,12 @@
   }
 
   @Override
-  public DefaultIndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable)
-  {
-    this.nullRowHandler = inputRow -> {
-      if (inputRow == null) {
-        nullRowRunnable.run();
-        return true;
-      }
-      return false;
-    };
-    return this;
-  }
-
-  @Override
-  public DefaultIndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(
-      Consumer<InputRow> absentBucketIntervalConsumer
-  )
-  {
-    this.absentBucketIntervalHandler = inputRow -> {
-      Optional<Interval> intervalOpt = granularitySpec.bucketInterval(inputRow.getTimestamp());
-      if (!intervalOpt.isPresent()) {
-        absentBucketIntervalConsumer.accept(inputRow);
-        return true;
-      }
-      return false;
-    };
-    return this;
-  }
-
-  @Override
   public HandlingInputRowIterator build()
   {
     Preconditions.checkNotNull(delegate, "delegate required");
     Preconditions.checkNotNull(granularitySpec, "granularitySpec required");
-    Preconditions.checkNotNull(nullRowHandler, "nullRowRunnable required");
-    Preconditions.checkNotNull(absentBucketIntervalHandler, "absentBucketIntervalConsumer required");
 
     ImmutableList.Builder<HandlingInputRowIterator.InputRowHandler> handlersBuilder = ImmutableList.<HandlingInputRowIterator.InputRowHandler>builder()
-        .add(nullRowHandler)
-        .add(createInvalidTimestampHandler())
-        .add(absentBucketIntervalHandler)
         .addAll(appendedInputRowHandlers);
 
     return new HandlingInputRowIterator(delegate, handlersBuilder.build());
@@ -134,18 +82,4 @@
     this.appendedInputRowHandlers.add(inputRowHandler);
     return this;
   }
-
-  private HandlingInputRowIterator.InputRowHandler createInvalidTimestampHandler()
-  {
-    return inputRow -> {
-      if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
-        String errorMsg = StringUtils.format(
-            "Encountered row with timestamp that cannot be represented as a long: [%s]",
-            inputRow
-        );
-        throw new ParseException(errorMsg);
-      }
-      return false;
-    };
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
index 80b4ea8..89afdbb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
@@ -25,16 +25,8 @@
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 
-import java.util.function.Consumer;
-
 public interface IndexTaskInputRowIteratorBuilder
 {
-  Runnable NOOP_RUNNABLE = () -> {
-  };
-
-  Consumer<InputRow> NOOP_CONSUMER = inputRow -> {
-  };
-
   /**
    * @param inputRowIterator Source of {@link InputRow}s.
    */
@@ -46,16 +38,5 @@
    */
   IndexTaskInputRowIteratorBuilder granularitySpec(GranularitySpec granularitySpec);
 
-  /**
-   * @param nullRowRunnable Runnable for when {@link Firehose} yields a null row.
-   */
-  IndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable);
-
-  /**
-   * @param absentBucketIntervalConsumer Consumer for when {@link Firehose} yields a row with a timestamp that does not
-   *                                     match the {@link GranularitySpec} bucket intervals.
-   */
-  IndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(Consumer<InputRow> absentBucketIntervalConsumer);
-
   HandlingInputRowIterator build();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
index 171d6b8..30d34bb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
@@ -24,26 +24,16 @@
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 
 import java.util.List;
-import java.util.function.Consumer;
 
 /**
  * <pre>
  * Build an {@link HandlingInputRowIterator} for {@link IndexTask}s used for range partitioning. Each {@link
  * InputRow} is processed by the following handlers, in order:
  *
- *   1. Null row: If {@link InputRow} is null, invoke the null row {@link Runnable} callback.
- *
- *   2. Invalid timestamp: If {@link InputRow} has an invalid timestamp, throw a {@link ParseException}.
- *
- *   3. Absent bucket interval: If {@link InputRow} has a timestamp that does not match the
- *      {@link GranularitySpec} bucket intervals, invoke the absent bucket interval {@link Consumer}
- *      callback.
- *
- *   4. Filter for rows with only a single dimension value count for the specified partition dimension.
+ *   1. Filter for rows with only a single dimension value count for the specified partition dimension.
  *
  * If any of the handlers invoke their respective callback, the {@link HandlingInputRowIterator} will yield
  * a null {@link InputRow} next; otherwise, the next {@link InputRow} is yielded.
@@ -83,18 +73,6 @@
   }
 
   @Override
-  public IndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable)
-  {
-    return delegate.nullRowRunnable(nullRowRunnable);
-  }
-
-  @Override
-  public IndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(Consumer<InputRow> absentBucketIntervalConsumer)
-  {
-    return delegate.absentBucketIntervalConsumer(absentBucketIntervalConsumer);
-  }
-
-  @Override
   public HandlingInputRowIterator build()
   {
     return delegate.build();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 130f041..b0f4702 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -34,7 +34,6 @@
 import org.apache.druid.indexing.overlord.sampler.SamplerResponse.SamplerResponseRow;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
@@ -43,7 +42,6 @@
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
-import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.indexing.DataSchema;
 
@@ -134,18 +132,11 @@
           }
 
           for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            if (!Intervals.ETERNITY.contains(row.getTimestamp())) {
-              throw new ParseException("Timestamp cannot be represented as a long: [%s]", row);
-            }
-            IncrementalIndexAddResult result = index.add(new SamplerInputRow(row, counter), true);
-            if (result.getParseException() != null) {
-              throw result.getParseException();
-            } else {
-              // store the raw value; will be merged with the data from the IncrementalIndex later
-              responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-              counter++;
-              numRowsIndexed++;
-            }
+            index.add(new SamplerInputRow(row, counter), true);
+            // store the raw value; will be merged with the data from the IncrementalIndex later
+            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
+            counter++;
+            numRowsIndexed++;
           }
         }
         catch (ParseException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 4fdb33d..54292d8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -37,13 +37,13 @@
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.NoopQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
@@ -51,7 +51,6 @@
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.apache.druid.utils.CircularBuffer;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nullable;
@@ -68,7 +67,6 @@
   protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
   protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
   protected final Map<String, Object> context;
-  protected final CircularBuffer<Throwable> savedParseExceptions;
   protected final LockGranularity lockGranularityToUse;
 
   // Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
@@ -99,11 +97,6 @@
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
-    if (tuningConfig.getMaxSavedParseExceptions() > 0) {
-      savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
-    } else {
-      savedParseExceptions = null;
-    }
     this.context = context;
     this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
     this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
@@ -179,7 +172,12 @@
     return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
   }
 
-  public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
+  public Appenderator newAppenderator(
+      TaskToolbox toolbox,
+      FireDepartmentMetrics metrics,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
   {
     return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
         getId(),
@@ -197,7 +195,9 @@
         toolbox.getJoinableFactory(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
-        toolbox.getCachePopulatorStats()
+        toolbox.getCachePopulatorStats(),
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
@@ -240,14 +240,6 @@
     final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent()
                                             && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());
 
-    if (!Intervals.ETERNITY.contains(row.getTimestamp())) {
-      final String errorMsg = StringUtils.format(
-          "Encountered row with timestamp that cannot be represented as a long: [%s]",
-          row
-      );
-      throw new ParseException(errorMsg);
-    }
-
     if (log.isDebugEnabled()) {
       if (beforeMinimumMessageTime) {
         log.debug(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c62584f..2a2bf2d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -58,7 +58,6 @@
 import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
 import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -69,10 +68,10 @@
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -85,7 +84,6 @@
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CircularBuffer;
 import org.apache.druid.utils.CollectionUtils;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
@@ -203,7 +201,6 @@
   private final InputFormat inputFormat;
   @Nullable
   private final InputRowParser<ByteBuffer> parser;
-  private final CircularBuffer<Throwable> savedParseExceptions;
   private final String stream;
 
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
@@ -214,6 +211,8 @@
 
   @MonotonicNonNull
   private RowIngestionMeters rowIngestionMeters;
+  @MonotonicNonNull
+  private ParseExceptionHandler parseExceptionHandler;
 
   @MonotonicNonNull
   private AuthorizerMapper authorizerMapper;
@@ -236,7 +235,6 @@
       final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task,
       @Nullable final InputRowParser<ByteBuffer> parser,
       final AuthorizerMapper authorizerMapper,
-      final CircularBuffer<Throwable> savedParseExceptions,
       final LockGranularity lockGranularityToUse
   )
   {
@@ -254,7 +252,6 @@
     this.inputFormat = ioConfig.getInputFormat();
     this.parser = parser;
     this.authorizerMapper = authorizerMapper;
-    this.savedParseExceptions = savedParseExceptions;
     this.stream = ioConfig.getStartSequenceNumbers().getStream();
     this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     this.sequences = new CopyOnWriteArrayList<>();
@@ -363,20 +360,29 @@
 
     setToolbox(toolbox);
 
+    authorizerMapper = toolbox.getAuthorizerMapper();
+    rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
     // Now we can initialize StreamChunkReader with the given toolbox.
     final StreamChunkParser parser = new StreamChunkParser(
         this.parser,
         inputFormat,
         inputRowSchema,
         task.getDataSchema().getTransformSpec(),
-        toolbox.getIndexingTmpDir()
+        toolbox.getIndexingTmpDir(),
+        row -> row != null && task.withinMinMaxRecordTime(row),
+        rowIngestionMeters,
+        parseExceptionHandler
     );
 
     initializeSequences();
 
-    authorizerMapper = toolbox.getAuthorizerMapper();
-    rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
-
     log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
     toolbox.getChatHandlerProvider().register(task.getId(), this, false);
 
@@ -412,7 +418,7 @@
         toolbox.getDataSegmentServerAnnouncer().announce();
         toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
       }
-      appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox);
+      appenderator = task.newAppenderator(toolbox, fireDepartmentMetrics, rowIngestionMeters, parseExceptionHandler);
       driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics);
 
       // Start up, set up initial sequences.
@@ -615,94 +621,74 @@
             );
 
             if (shouldProcess) {
-              try {
-                final List<byte[]> valueBytess = record.getData();
-                final List<InputRow> rows;
-                if (valueBytess == null || valueBytess.isEmpty()) {
-                  rows = Utils.nullableListOf((InputRow) null);
-                } else {
-                  rows = parser.parse(valueBytess);
-                }
-                boolean isPersistRequired = false;
+              final List<byte[]> valueBytess = record.getData();
+              final List<InputRow> rows = parser.parse(valueBytess);
+              boolean isPersistRequired = false;
 
-                final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
-                    .stream()
-                    .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
-                    .findFirst()
-                    .orElse(null);
+              final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
+                  .stream()
+                  .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
+                  .findFirst()
+                  .orElse(null);
 
-                if (sequenceToUse == null) {
-                  throw new ISE(
-                      "Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s",
-                      record.getPartitionId(),
-                      record.getSequenceNumber(),
-                      sequences
+              if (sequenceToUse == null) {
+                throw new ISE(
+                    "Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s",
+                    record.getPartitionId(),
+                    record.getSequenceNumber(),
+                    sequences
+                );
+              }
+
+              for (InputRow row : rows) {
+                final AppenderatorDriverAddResult addResult = driver.add(
+                    row,
+                    sequenceToUse.getSequenceName(),
+                    committerSupplier,
+                    true,
+                    // do not allow incremental persists to happen until all the rows from this batch
+                    // of rows are indexed
+                    false
+                );
+
+                if (addResult.isOk()) {
+                  // If the number of rows in the segment exceeds the threshold after adding a row,
+                  // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
+                  final boolean isPushRequired = addResult.isPushRequired(
+                      tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
+                      tuningConfig.getPartitionsSpec()
+                                  .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
                   );
-                }
-
-                for (InputRow row : rows) {
-                  if (row != null && task.withinMinMaxRecordTime(row)) {
-                    final AppenderatorDriverAddResult addResult = driver.add(
-                        row,
-                        sequenceToUse.getSequenceName(),
-                        committerSupplier,
-                        true,
-                        // do not allow incremental persists to happen until all the rows from this batch
-                        // of rows are indexed
-                        false
-                    );
-
-                    if (addResult.isOk()) {
-                      // If the number of rows in the segment exceeds the threshold after adding a row,
-                      // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
-                      final boolean isPushRequired = addResult.isPushRequired(
-                          tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
-                          tuningConfig.getPartitionsSpec()
-                                      .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
-                      );
-                      if (isPushRequired && !sequenceToUse.isCheckpointed()) {
-                        sequenceToCheckpoint = sequenceToUse;
-                      }
-                      isPersistRequired |= addResult.isPersistRequired();
-                    } else {
-                      // Failure to allocate segment puts determinism at risk, bail out to be safe.
-                      // May want configurable behavior here at some point.
-                      // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
-                      throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
-                    }
-
-                    if (addResult.getParseException() != null) {
-                      handleParseException(addResult.getParseException(), record);
-                    } else {
-                      rowIngestionMeters.incrementProcessed();
-                    }
-                  } else {
-                    rowIngestionMeters.incrementThrownAway();
+                  if (isPushRequired && !sequenceToUse.isCheckpointed()) {
+                    sequenceToCheckpoint = sequenceToUse;
                   }
-                }
-                if (isPersistRequired) {
-                  Futures.addCallback(
-                      driver.persistAsync(committerSupplier.get()),
-                      new FutureCallback<Object>()
-                      {
-                        @Override
-                        public void onSuccess(@Nullable Object result)
-                        {
-                          log.debug("Persist completed with metadata: %s", result);
-                        }
-
-                        @Override
-                        public void onFailure(Throwable t)
-                        {
-                          log.error("Persist failed, dying");
-                          backgroundThreadException = t;
-                        }
-                      }
-                  );
+                  isPersistRequired |= addResult.isPersistRequired();
+                } else {
+                  // Failure to allocate segment puts determinism at risk, bail out to be safe.
+                  // May want configurable behavior here at some point.
+                  // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
+                  throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
                 }
               }
-              catch (ParseException e) {
-                handleParseException(e, record);
+              if (isPersistRequired) {
+                Futures.addCallback(
+                    driver.persistAsync(committerSupplier.get()),
+                    new FutureCallback<Object>()
+                    {
+                      @Override
+                      public void onSuccess(@Nullable Object result)
+                      {
+                        log.debug("Persist completed with metadata: %s", result);
+                      }
+
+                      @Override
+                      public void onFailure(Throwable t)
+                      {
+                        log.error("Persist failed, dying");
+                        backgroundThreadException = t;
+                      }
+                    }
+                );
               }
 
               // in kafka, we can easily get the next offset by adding 1, but for kinesis, there's no way
@@ -1082,7 +1068,7 @@
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
-        savedParseExceptions
+        parseExceptionHandler.getSavedParseExceptions()
     );
     if (buildSegmentsParseExceptionMessages != null) {
       unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
@@ -1297,34 +1283,6 @@
     return false;
   }
 
-
-  private void handleParseException(ParseException e, OrderedPartitionableRecord record)
-  {
-    if (e.isFromPartiallyValidRow()) {
-      rowIngestionMeters.incrementProcessedWithError();
-    } else {
-      rowIngestionMeters.incrementUnparseable();
-    }
-
-    if (tuningConfig.isLogParseExceptions()) {
-      log.info(
-          e,
-          "Row at partition[%s] offset[%s] was unparseable.",
-          record.getPartitionId(),
-          record.getSequenceNumber()
-      );
-    }
-
-    if (savedParseExceptions != null) {
-      savedParseExceptions.add(e);
-    }
-
-    if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
-        > tuningConfig.getMaxParseExceptions()) {
-      throw new RuntimeException("Max parse exceptions exceeded");
-    }
-  }
-
   private boolean isPaused()
   {
     return status == Status.PAUSED;
@@ -1588,7 +1546,9 @@
   )
   {
     authorizationCheck(req, Action.READ);
-    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()
+    );
     return Response.ok(events).build();
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 6061aff..c4089c9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -19,13 +19,18 @@
 
 package org.apache.druid.indexing.seekablestream;
 
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
+import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.transform.TransformSpec;
 
 import javax.annotation.Nullable;
@@ -33,7 +38,9 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Predicate;
 
 /**
  * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
@@ -45,6 +52,9 @@
   private final InputRowParser<ByteBuffer> parser;
   @Nullable
   private final SettableByteEntityReader byteEntityReader;
+  private final Predicate<InputRow> rowFilter;
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
 
   /**
    * Either parser or inputFormat shouldn't be null.
@@ -54,12 +64,16 @@
       @Nullable InputFormat inputFormat,
       InputRowSchema inputRowSchema,
       TransformSpec transformSpec,
-      File indexingTmpDir
+      File indexingTmpDir,
+      Predicate<InputRow> rowFilter,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     if (parser == null && inputFormat == null) {
       throw new IAE("Either parser or inputFormat should be set");
     }
+    // parser is already decorated with transformSpec in DataSchema
     this.parser = parser;
     if (inputFormat != null) {
       this.byteEntityReader = new SettableByteEntityReader(
@@ -71,27 +85,41 @@
     } else {
       this.byteEntityReader = null;
     }
+    this.rowFilter = rowFilter;
+    this.rowIngestionMeters = rowIngestionMeters;
+    this.parseExceptionHandler = parseExceptionHandler;
   }
 
-  List<InputRow> parse(List<byte[]> streamChunk) throws IOException
+  List<InputRow> parse(@Nullable List<byte[]> streamChunk) throws IOException
   {
-    if (byteEntityReader != null) {
-      return parseWithInputFormat(byteEntityReader, streamChunk);
+    if (streamChunk == null || streamChunk.isEmpty()) {
+      rowIngestionMeters.incrementThrownAway();
+      return Collections.emptyList();
     } else {
-      return parseWithParser(parser, streamChunk);
+      if (byteEntityReader != null) {
+        return parseWithInputFormat(byteEntityReader, streamChunk);
+      } else {
+        return parseWithParser(parser, streamChunk);
+      }
     }
   }
 
-  private static List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
+  private List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
   {
-    final List<InputRow> rows = new ArrayList<>();
-    for (byte[] valueBytes : valueBytess) {
-      rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
-    }
-    return rows;
+    final FluentIterable<InputRow> iterable = FluentIterable
+        .from(valueBytess)
+        .transformAndConcat(bytes -> parser.parseBatch(ByteBuffer.wrap(bytes)));
+
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        CloseableIterators.withEmptyBaggage(iterable.iterator()),
+        rowFilter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+    return Lists.newArrayList(rowIterator);
   }
 
-  private static List<InputRow> parseWithInputFormat(
+  private List<InputRow> parseWithInputFormat(
       SettableByteEntityReader byteEntityReader,
       List<byte[]> valueBytess
   ) throws IOException
@@ -99,7 +127,12 @@
     final List<InputRow> rows = new ArrayList<>();
     for (byte[] valueBytes : valueBytess) {
       byteEntityReader.setEntity(new ByteEntity(valueBytes));
-      try (CloseableIterator<InputRow> rowIterator = byteEntityReader.read()) {
+      try (FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+          byteEntityReader.read(),
+          rowFilter,
+          rowIngestionMeters,
+          parseExceptionHandler
+      )) {
         rowIterator.forEachRemaining(rows::add);
       }
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2fdf85b..da6dc8b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -45,7 +45,6 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IndexTaskClient;
 import org.apache.druid.indexing.common.TaskInfoProvider;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -82,6 +81,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index ad26c20..50be464 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -26,7 +26,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
@@ -35,6 +34,7 @@
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index c2bf244..c36a072 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -32,7 +32,6 @@
 import org.apache.druid.data.input.impl.NoopInputSource;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@@ -44,6 +43,7 @@
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 290263b..a9d82b1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -65,7 +65,6 @@
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -110,6 +109,7 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -123,6 +123,7 @@
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -157,7 +158,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-public class AppenderatorDriverRealtimeIndexTaskTest
+public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHandlingTest
 {
   private static final Logger log = new Logger(AppenderatorDriverRealtimeIndexTaskTest.class);
   private static final ServiceEmitter EMITTER = new ServiceEmitter(
@@ -683,8 +684,7 @@
 
     // Wait for the task to finish.
     TaskStatus status = statusFuture.get();
-    Assert.assertTrue(status.getErrorMsg()
-                            .contains("java.lang.RuntimeException: Max parse exceptions exceeded, terminating task..."));
+    Assert.assertTrue(status.getErrorMsg().contains("org.apache.druid.java.util.common.RE: Max parse exceptions[0] exceeded"));
 
     IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
 
@@ -898,10 +898,10 @@
     Map<String, Object> expectedUnparseables = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         Arrays.asList(
-            "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
+            "Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]",
-            "Unparseable timestamp found! Event: null"
+            "Timestamp[null] is unparseable! Event: null"
         )
     );
     Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
@@ -979,10 +979,10 @@
     Map<String, Object> expectedUnparseables = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         Arrays.asList(
-            "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
+            "Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]",
-            "Unparseable timestamp found! Event: null"
+            "Timestamp[null] is unparseable! Event: null"
         )
     );
     Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index f63fa22..0692c60 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -41,7 +41,6 @@
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
@@ -50,6 +49,7 @@
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index e635381..c82f1e7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -62,7 +62,6 @@
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.CompactionTask.Builder;
 import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
 import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
@@ -104,6 +103,7 @@
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
new file mode 100644
index 0000000..b26c736
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class FilteringCloseableInputRowIteratorTest
+{
+  private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
+
+  private RowIngestionMeters rowIngestionMeters;
+  private ParseExceptionHandler parseExceptionHandler;
+
+  @Before
+  public void setup()
+  {
+    rowIngestionMeters = new SimpleRowIngestionMeters();
+    parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        false,
+        Integer.MAX_VALUE,
+        1024 // do not use Integer.MAX_VALUE since it will create an object array of this length
+    );
+  }
+
+  @Test
+  public void testFilterOutRows()
+  {
+    final List<InputRow> rows = ImmutableList.of(
+        newRow(DateTimes.of("2020-01-01"), 10, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 400),
+        newRow(DateTimes.of("2020-01-01"), 20, 400),
+        newRow(DateTimes.of("2020-01-01"), 10, 800),
+        newRow(DateTimes.of("2020-01-01"), 30, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 300)
+    );
+    final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 10;
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        CloseableIterators.withEmptyBaggage(rows.iterator()),
+        filter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+    Assert.assertEquals(
+        rows.stream().filter(filter).collect(Collectors.toList()),
+        filteredRows
+    );
+    Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+  }
+
+  @Test
+  public void testParseExceptionInDelegateNext()
+  {
+    final List<InputRow> rows = ImmutableList.of(
+        newRow(DateTimes.of("2020-01-01"), 10, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 400),
+        newRow(DateTimes.of("2020-01-01"), 20, 400),
+        newRow(DateTimes.of("2020-01-01"), 10, 800),
+        newRow(DateTimes.of("2020-01-01"), 30, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 300)
+    );
+
+    // This iterator throws ParseException every other call to next().
+    final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+    {
+      final int numRowsToIterate = rows.size() * 2;
+      int nextIdx = 0;
+
+      @Override
+      public boolean hasNext()
+      {
+        return nextIdx < numRowsToIterate;
+      }
+
+      @Override
+      public InputRow next()
+      {
+        final int currentIdx = nextIdx++;
+        if (currentIdx % 2 == 0) {
+          return rows.get(currentIdx / 2);
+        } else {
+          throw new ParseException("Parse exception at ", currentIdx);
+        }
+      }
+
+      @Override
+      public void close()
+      {
+      }
+    };
+
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        parseExceptionThrowingIterator,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+    Assert.assertEquals(rows, filteredRows);
+    Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable());
+  }
+
+  @Test
+  public void testParseExceptionInPredicateTest()
+  {
+    final List<InputRow> rows = ImmutableList.of(
+        newRow(DateTimes.of("2020-01-01"), 10, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 400),
+        newRow(DateTimes.of("2020-01-01"), 20, 400),
+        newRow(DateTimes.of("2020-01-01"), 10, 800),
+        newRow(DateTimes.of("2020-01-01"), 30, 200),
+        newRow(DateTimes.of("2020-01-01"), 10, 300)
+    );
+
+    final CloseableIterator<InputRow> parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage(
+        rows.iterator()
+    );
+    // This filter throws ParseException every other call to test().
+    final Predicate<InputRow> filter = new Predicate<InputRow>()
+    {
+      boolean throwParseException = false;
+
+      @Override
+      public boolean test(InputRow inputRow)
+      {
+        if (throwParseException) {
+          throwParseException = false;
+          throw new ParseException("test");
+        } else {
+          throwParseException = true;
+          return true;
+        }
+      }
+    };
+
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        parseExceptionThrowingIterator,
+        filter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+    final List<InputRow> expectedRows = ImmutableList.of(
+        rows.get(0),
+        rows.get(2),
+        rows.get(4)
+    );
+    Assert.assertEquals(expectedRows, filteredRows);
+    Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+  }
+
+  @Test
+  public void testCloseDelegateIsClosed() throws IOException
+  {
+    final MutableBoolean closed = new MutableBoolean(false);
+    final CloseableIterator<InputRow> delegate = CloseableIterators.wrap(
+        Collections.emptyIterator(),
+        closed::setTrue
+    );
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        delegate,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+    rowIterator.close();
+    Assert.assertTrue(closed.isTrue());
+  }
+
+  private static InputRow newRow(DateTime timestamp, Object dim1Val, Object dim2Val)
+  {
+    return new MapBasedInputRow(timestamp, DIMENSIONS, ImmutableMap.of("dim1", dim1Val, "dim2", dim2Val));
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 7bb2579..39ee7cf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -50,8 +50,6 @@
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -72,6 +70,8 @@
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -1030,7 +1030,7 @@
         RowIngestionMeters.DETERMINE_PARTITIONS,
         new ArrayList<>(),
         RowIngestionMeters.BUILD_SEGMENTS,
-        Collections.singletonList("Unparseable timestamp found! Event: {time=unparseable, d=a, val=1}")
+        Collections.singletonList("Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}")
     );
     IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
     Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
@@ -1151,19 +1151,19 @@
         RowIngestionMeters.DETERMINE_PARTITIONS,
         Arrays.asList(
             "Unable to parse row [this is not JSON]",
-            "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
             "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
-            "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
         ),
         RowIngestionMeters.BUILD_SEGMENTS,
         Arrays.asList(
             "Unable to parse row [this is not JSON]",
-            "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
             "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0, val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to parse value[notnumber] for field[val],]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to float,]",
             "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,]",
-            "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
         )
     );
 
@@ -1281,9 +1281,9 @@
         new ArrayList<>(),
         RowIngestionMeters.BUILD_SEGMENTS,
         Arrays.asList(
-            "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-            "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
-            "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
         )
     );
 
@@ -1399,9 +1399,9 @@
     Map<String, Object> expectedUnparseables = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
         Arrays.asList(
-            "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-            "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
-            "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
         ),
         RowIngestionMeters.BUILD_SEGMENTS,
         new ArrayList<>()
@@ -1554,7 +1554,7 @@
         new ArrayList<>(),
         RowIngestionMeters.BUILD_SEGMENTS,
         Collections.singletonList(
-            "Unparseable timestamp found! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}")
+            "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}")
     );
     Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
   }
@@ -1697,7 +1697,7 @@
     // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message
     Assert.assertThat(
         status.getErrorMsg(),
-        CoreMatchers.containsString("Max parse exceptions exceeded")
+        CoreMatchers.containsString("Max parse exceptions")
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index d6990c0..80ce36f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -34,7 +34,6 @@
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionToolbox;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskLockbox;
@@ -55,6 +54,7 @@
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 869bfcb..b87dcf7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -113,6 +113,7 @@
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
@@ -139,7 +140,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 
-public class RealtimeIndexTaskTest
+public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
 {
   private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
   private static final ServiceEmitter EMITTER = new ServiceEmitter(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index ac02949..9dfbd4e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -34,7 +34,6 @@
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -44,6 +43,7 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 337fda0..2a342b7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -30,6 +30,8 @@
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -64,7 +66,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     realtimeAppenderator = Appenderators.createRealtime(
@@ -83,7 +87,9 @@
         joinableFactory,
         cache,
         cacheConfig,
-        cachePopulatorStats
+        cachePopulatorStats,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
     return realtimeAppenderator;
   }
@@ -97,7 +103,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     return Appenderators.createOffline(
@@ -108,7 +116,9 @@
         dataSegmentPusher,
         objectMapper,
         indexIO,
-        indexMerger
+        indexMerger,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 94d1b3f..5ef3120 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -56,7 +56,6 @@
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
@@ -76,6 +75,7 @@
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index d3da888..87de8c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -34,11 +34,13 @@
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
 import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.junit.LoggerCaptureRule;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
@@ -149,7 +151,7 @@
     public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
     @Rule
-    public LoggerCaptureRule logger = new LoggerCaptureRule(PartialDimensionDistributionTask.class);
+    public LoggerCaptureRule logger = new LoggerCaptureRule(ParseExceptionHandler.class);
 
     private Capture<SubTaskReport> reportCapture;
     private TaskToolbox taskToolbox;
@@ -180,6 +182,7 @@
           }
       );
       EasyMock.expect(taskToolbox.getIndexingServiceClient()).andReturn(new NoopIndexingServiceClient());
+      EasyMock.expect(taskToolbox.getRowIngestionMetersFactory()).andReturn(new DropwizardRowIngestionMetersFactory());
       EasyMock.replay(taskToolbox);
     }
 
@@ -253,7 +256,7 @@
           .build();
 
       exception.expect(RuntimeException.class);
-      exception.expectMessage("Max parse exceptions exceeded");
+      exception.expectMessage("Max parse exceptions[0] exceeded");
 
       task.runTask(taskToolbox);
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 75cf7af..b7f107d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -186,9 +186,10 @@
     // Ingest all data.
     runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
 
-    final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
     final Collection<DataSegment> allSegments = new HashSet<>(
-        getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE)
+        inputInterval == null
+        ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
+        : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE)
     );
 
     // Reingest the same data. Each segment should get replaced by a segment with a newer version.
@@ -204,10 +205,18 @@
 
     // Verify that the segment has been replaced.
     final Collection<DataSegment> newSegments =
-        getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
+        inputInterval == null
+        ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
+        : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
+    Assert.assertFalse(newSegments.isEmpty());
     allSegments.addAll(newSegments);
     final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
-    final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
+
+    final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
+    final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(
+        timelineInterval,
+        Partitions.ONLY_COMPLETE
+    );
     Assert.assertEquals(new HashSet<>(newSegments), visibles);
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
index c5dd5ed..ddf0fbd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
@@ -23,7 +23,6 @@
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.easymock.EasyMock;
 import org.joda.time.DateTime;
@@ -36,7 +35,6 @@
 
 import java.util.Collections;
 import java.util.List;
-import java.util.function.Consumer;
 
 @RunWith(Enclosed.class)
 public class DefaultIndexTaskInputRowIteratorBuilderTest
@@ -45,9 +43,6 @@
   {
     private static final CloseableIterator<InputRow> ITERATOR = EasyMock.mock(CloseableIterator.class);
     private static final GranularitySpec GRANULARITY_SPEC = EasyMock.mock(GranularitySpec.class);
-    private static final Runnable NULL_ROW_RUNNABLE = IndexTaskInputRowIteratorBuilder.NOOP_RUNNABLE;
-    private static final Consumer<InputRow> ABSENT_BUCKET_INTERVAL_CONSUMER =
-        IndexTaskInputRowIteratorBuilder.NOOP_CONSUMER;
 
     @Rule
     public ExpectedException exception = ExpectedException.none();
@@ -60,8 +55,6 @@
 
       new DefaultIndexTaskInputRowIteratorBuilder()
           .granularitySpec(GRANULARITY_SPEC)
-          .nullRowRunnable(NULL_ROW_RUNNABLE)
-          .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
           .build();
     }
 
@@ -73,34 +66,6 @@
 
       new DefaultIndexTaskInputRowIteratorBuilder()
           .delegate(ITERATOR)
-          .nullRowRunnable(NULL_ROW_RUNNABLE)
-          .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
-          .build();
-    }
-
-    @Test
-    public void requiresNullRowHandler()
-    {
-      exception.expect(NullPointerException.class);
-      exception.expectMessage("nullRowRunnable required");
-
-      new DefaultIndexTaskInputRowIteratorBuilder()
-          .delegate(ITERATOR)
-          .granularitySpec(GRANULARITY_SPEC)
-          .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
-          .build();
-    }
-
-    @Test
-    public void requiresAbsentBucketIntervalHandler()
-    {
-      exception.expect(NullPointerException.class);
-      exception.expectMessage("absentBucketIntervalConsumer required");
-
-      new DefaultIndexTaskInputRowIteratorBuilder()
-          .delegate(ITERATOR)
-          .granularitySpec(GRANULARITY_SPEC)
-          .nullRowRunnable(NULL_ROW_RUNNABLE)
           .build();
     }
 
@@ -110,8 +75,6 @@
       new DefaultIndexTaskInputRowIteratorBuilder()
           .delegate(ITERATOR)
           .granularitySpec(GRANULARITY_SPEC)
-          .nullRowRunnable(NULL_ROW_RUNNABLE)
-          .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
           .build();
     }
   }
@@ -128,69 +91,6 @@
     public ExpectedException exception = ExpectedException.none();
 
     @Test
-    public void invokesNullRowHandlerFirst()
-    {
-      DateTime invalidTimestamp = DateTimes.utc(Long.MAX_VALUE);
-      CloseableIterator<InputRow> nullInputRowIterator =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(null);
-      GranularitySpec absentBucketIntervalGranularitySpec =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(invalidTimestamp);
-
-      List<IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler> handlerInvocationHistory =
-          HANDLER_TESTER.invokeHandlers(
-              nullInputRowIterator,
-              absentBucketIntervalGranularitySpec,
-              NO_NEXT_INPUT_ROW
-          );
-
-      Assert.assertEquals(
-          Collections.singletonList(IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler.NULL_ROW),
-          handlerInvocationHistory
-      );
-    }
-
-    @Test
-    public void invokesInvalidTimestampHandlerBeforeAbsentBucketIntervalHandler()
-    {
-      DateTime invalidTimestamp = DateTimes.utc(Long.MAX_VALUE);
-      InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(invalidTimestamp);
-      CloseableIterator<InputRow> inputRowIterator =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(inputRow);
-      GranularitySpec absentBucketIntervalGranularitySpec =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(invalidTimestamp);
-
-      exception.expect(ParseException.class);
-      exception.expectMessage("Encountered row with timestamp that cannot be represented as a long");
-
-      HANDLER_TESTER.invokeHandlers(inputRowIterator, absentBucketIntervalGranularitySpec, NO_NEXT_INPUT_ROW);
-    }
-
-    @Test
-    public void invokesAbsentBucketIntervalHandlerLast()
-    {
-      DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
-      InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(timestamp);
-      CloseableIterator<InputRow> inputRowIterator =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(inputRow);
-      GranularitySpec absentBucketIntervalGranularitySpec =
-          IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(timestamp);
-
-      List<IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler> handlerInvocationHistory =
-          HANDLER_TESTER.invokeHandlers(
-              inputRowIterator,
-              absentBucketIntervalGranularitySpec,
-              NO_NEXT_INPUT_ROW
-          );
-
-      Assert.assertEquals(
-          Collections.singletonList(
-              IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler.ABSENT_BUCKET_INTERVAL
-          ),
-          handlerInvocationHistory
-      );
-    }
-
-    @Test
     public void invokesAppendedHandlersLast()
     {
       DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
index 39300ac..0ca2712 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
@@ -143,9 +143,7 @@
       List<Handler> handlerInvocationHistory = new ArrayList<>();
       IndexTaskInputRowIteratorBuilder iteratorBuilder = iteratorBuilderSupplier.get()
           .delegate(inputRowIterator)
-          .granularitySpec(granularitySpec)
-          .nullRowRunnable(() -> handlerInvocationHistory.add(Handler.NULL_ROW))
-          .absentBucketIntervalConsumer(row -> handlerInvocationHistory.add(Handler.ABSENT_BUCKET_INTERVAL));
+          .granularitySpec(granularitySpec);
 
       if (iteratorBuilder instanceof DefaultIndexTaskInputRowIteratorBuilder) {
         appendedHandlers.stream()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 3ebed1e..af5c988 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -1175,13 +1175,13 @@
   private String getUnparseableTimestampString()
   {
     return ParserType.STR_CSV.equals(parserType)
-           ? "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
-           : "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+           ? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
+           : "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
   }
 
   private String unparseableTimestampErrorString(Map<String, Object> rawColumns)
   {
-    return StringUtils.format("Unparseable timestamp found! Event: %s", rawColumns);
+    return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
   }
 
   @Nullable
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 1073292..526430d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -33,6 +33,9 @@
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -58,6 +61,14 @@
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  private final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
+  private final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+      rowIngestionMeters,
+      false,
+      0,
+      0
+  );
+
   @Test
   public void testWithParserAndNullInputformatParseProperly() throws IOException
   {
@@ -77,7 +88,10 @@
         null,
         null,
         null,
-        null
+        null,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
     parseAndAssertResult(chunkParser);
   }
@@ -91,7 +105,10 @@
         inputFormat,
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
         TransformSpec.NONE,
-        temporaryFolder.newFolder()
+        temporaryFolder.newFolder(),
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
     parseAndAssertResult(chunkParser);
   }
@@ -106,7 +123,10 @@
         null,
         null,
         null,
-        null
+        null,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
@@ -132,7 +152,10 @@
         inputFormat,
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
         TransformSpec.NONE,
-        temporaryFolder.newFolder()
+        temporaryFolder.newFolder(),
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
     parseAndAssertResult(chunkParser);
     Assert.assertTrue(inputFormat.used);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 25c9009..6bfbd7f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -30,7 +30,6 @@
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -67,6 +66,7 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
diff --git a/processing/pom.xml b/processing/pom.xml
index 59873ca..035feb3 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -234,6 +234,16 @@
             <artifactId>hamcrest-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 437d30e..a718b53 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -223,7 +223,6 @@
   private final AggregatorFactory[] metrics;
   private final AggregatorType[] aggs;
   private final boolean deserializeComplexMetrics;
-  private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex
   private final Metadata metadata;
 
   private final Map<String, MetricDesc> metricDescs;
@@ -252,14 +251,11 @@
    * @param incrementalIndexSchema    the schema to use for incremental index
    * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
    *                                  value for aggregators that return metrics other than float.
-   * @param reportParseExceptions     flag whether or not to report ParseExceptions that occur while extracting values
-   *                                  from input rows
    * @param concurrentEventAdd        flag whether ot not adding of input rows should be thread-safe
    */
   protected IncrementalIndex(
       final IncrementalIndexSchema incrementalIndexSchema,
       final boolean deserializeComplexMetrics,
-      final boolean reportParseExceptions,
       final boolean concurrentEventAdd
   )
   {
@@ -270,7 +266,6 @@
     this.metrics = incrementalIndexSchema.getMetrics();
     this.rowTransformers = new CopyOnWriteArrayList<>();
     this.deserializeComplexMetrics = deserializeComplexMetrics;
-    this.reportParseExceptions = reportParseExceptions;
 
     this.timeAndMetricsColumnCapabilities = new HashMap<>();
     this.metadata = new Metadata(
@@ -329,7 +324,6 @@
     @Nullable
     private IncrementalIndexSchema incrementalIndexSchema;
     private boolean deserializeComplexMetrics;
-    private boolean reportParseExceptions;
     private boolean concurrentEventAdd;
     private boolean sortFacts;
     private int maxRowCount;
@@ -339,7 +333,6 @@
     {
       incrementalIndexSchema = null;
       deserializeComplexMetrics = true;
-      reportParseExceptions = true;
       concurrentEventAdd = false;
       sortFacts = true;
       maxRowCount = 0;
@@ -391,12 +384,6 @@
       return this;
     }
 
-    public Builder setReportParseExceptions(final boolean reportParseExceptions)
-    {
-      this.reportParseExceptions = reportParseExceptions;
-      return this;
-    }
-
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
       this.concurrentEventAdd = concurrentEventAdd;
@@ -431,7 +418,6 @@
       return new OnheapIncrementalIndex(
           Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
           deserializeComplexMetrics,
-          reportParseExceptions,
           concurrentEventAdd,
           sortFacts,
           maxRowCount,
@@ -448,7 +434,6 @@
       return new OffheapIncrementalIndex(
           Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
           deserializeComplexMetrics,
-          reportParseExceptions,
           concurrentEventAdd,
           sortFacts,
           maxRowCount,
@@ -610,7 +595,7 @@
         skipMaxRowsInMemoryCheck
     );
     updateMaxIngestedTime(row.getTimestamp());
-    ParseException parseException = getCombinedParseException(
+    @Nullable ParseException parseException = getCombinedParseException(
         row,
         incrementalIndexRowResult.getParseExceptionMessages(),
         addToFactsResult.getParseExceptionMessages()
@@ -753,13 +738,12 @@
     if (numAdded == 0) {
       return null;
     }
-    ParseException pe = new ParseException(
+    return new ParseException(
+        true,
         "Found unparseable columns in row: [%s], exceptions: [%s]",
         row,
         stringBuilder.toString()
     );
-    pe.setFromPartiallyValidRow(true);
-    return pe;
   }
 
   private synchronized void updateMaxIngestedTime(DateTime eventTime)
@@ -784,11 +768,6 @@
     return deserializeComplexMetrics;
   }
 
-  boolean getReportParseExceptions()
-  {
-    return reportParseExceptions;
-  }
-
   AtomicInteger getNumEntries()
   {
     return numEntries;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
index c483e50..077f162 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
@@ -31,9 +31,9 @@
   @Nullable
   private final ParseException parseException;
   @Nullable
-  private String reasonOfNotAdded;
+  private final String reasonOfNotAdded;
 
-  public IncrementalIndexAddResult(
+  private IncrementalIndexAddResult(
       int rowCount,
       long bytesInMemory,
       @Nullable ParseException parseException,
@@ -55,6 +55,23 @@
     this(rowCount, bytesInMemory, parseException, null);
   }
 
+  public IncrementalIndexAddResult(
+      int rowCount,
+      long bytesInMemory,
+      String reasonOfNotAdded
+  )
+  {
+    this(rowCount, bytesInMemory, null, reasonOfNotAdded);
+  }
+
+  public IncrementalIndexAddResult(
+      int rowCount,
+      long bytesInMemory
+  )
+  {
+    this(rowCount, bytesInMemory, null, null);
+  }
+
   public int getRowCount()
   {
     return rowCount;
@@ -65,12 +82,22 @@
     return bytesInMemory;
   }
 
+  public boolean hasParseException()
+  {
+    return parseException != null;
+  }
+
   @Nullable
   public ParseException getParseException()
   {
     return parseException;
   }
 
+  public boolean isRowAdded()
+  {
+    return reasonOfNotAdded == null && parseException == null;
+  }
+
   @Nullable
   public String getReasonOfNotAdded()
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
new file mode 100644
index 0000000..3ce1138
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This class is used only in {@code RealtimeIndexTask} which is deprecated now.
+ *
+ * Consider using {@link RowIngestionMetersFactory} instead.
+ */
+public class NoopRowIngestionMeters implements RowIngestionMeters
+{
+  private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0);
+
+  @Override
+  public long getProcessed()
+  {
+    return 0;
+  }
+
+  @Override
+  public void incrementProcessed()
+  {
+
+  }
+
+  @Override
+  public long getProcessedWithError()
+  {
+    return 0;
+  }
+
+  @Override
+  public void incrementProcessedWithError()
+  {
+
+  }
+
+  @Override
+  public long getUnparseable()
+  {
+    return 0;
+  }
+
+  @Override
+  public void incrementUnparseable()
+  {
+
+  }
+
+  @Override
+  public long getThrownAway()
+  {
+    return 0;
+  }
+
+  @Override
+  public void incrementThrownAway()
+  {
+
+  }
+
+  @Override
+  public RowIngestionMetersTotals getTotals()
+  {
+    return EMPTY_TOTALS;
+  }
+
+  @Override
+  public Map<String, Object> getMovingAverages()
+  {
+    return Collections.emptyMap();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 490f625..97da994 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -34,7 +34,6 @@
 import org.apache.druid.segment.ColumnSelectorFactory;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -77,14 +76,13 @@
   OffheapIncrementalIndex(
       IncrementalIndexSchema incrementalIndexSchema,
       boolean deserializeComplexMetrics,
-      boolean reportParseExceptions,
       boolean concurrentEventAdd,
       boolean sortFacts,
       int maxRowCount,
       NonBlockingPool<ByteBuffer> bufferPool
   )
   {
-    super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
+    super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd);
     this.maxRowCount = maxRowCount;
     this.bufferPool = bufferPool;
 
@@ -224,6 +222,7 @@
 
     rowContainer.set(row);
 
+    final List<String> parseExceptionMessages = new ArrayList<>();
     for (int i = 0; i < getMetrics().length; i++) {
       final BufferAggregator agg = getAggs()[i];
 
@@ -233,16 +232,13 @@
         }
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-          if (getReportParseExceptions()) {
-            throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
-          } else {
-            log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
-          }
+          log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
+          parseExceptionMessages.add(e.getMessage());
         }
       }
     }
     rowContainer.set(null);
-    return new AddToFactsResult(getNumEntries().get(), 0, new ArrayList<>());
+    return new AddToFactsResult(getNumEntries().get(), 0, parseExceptionMessages);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 04cdaba..9bbdf60 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -69,14 +69,13 @@
   OnheapIncrementalIndex(
       IncrementalIndexSchema incrementalIndexSchema,
       boolean deserializeComplexMetrics,
-      boolean reportParseExceptions,
       boolean concurrentEventAdd,
       boolean sortFacts,
       int maxRowCount,
       long maxBytesInMemory
   )
   {
-    super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
+    super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd);
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -150,7 +149,7 @@
       boolean skipMaxRowsInMemoryCheck
   ) throws IndexSizeExceededException
   {
-    List<String> parseExceptionMessages;
+    final List<String> parseExceptionMessages = new ArrayList<>();
     final int priorIndex = facts.getPriorIndex(key);
 
     Aggregator[] aggs;
@@ -159,11 +158,11 @@
     final AtomicLong sizeInBytes = getBytesInMemory();
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
       aggs = concurrentGet(priorIndex);
-      parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+      doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
     } else {
       aggs = new Aggregator[metrics.length];
       factorizeAggs(metrics, aggs, rowContainer, row);
-      parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+      doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
 
       final int rowIndex = indexIncrement.getAndIncrement();
       concurrentSet(rowIndex, aggs);
@@ -185,8 +184,9 @@
         sizeInBytes.addAndGet(estimatedRowSize);
       } else {
         // We lost a race
+        parseExceptionMessages.clear();
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
         // Free up the misfire
         concurrentRemove(rowIndex);
         // This is expected to occur ~80% of the time in the worst scenarios
@@ -235,14 +235,14 @@
     rowContainer.set(null);
   }
 
-  private List<String> doAggregate(
+  private void doAggregate(
       AggregatorFactory[] metrics,
       Aggregator[] aggs,
       ThreadLocal<InputRow> rowContainer,
-      InputRow row
+      InputRow row,
+      List<String> parseExceptionsHolder
   )
   {
-    List<String> parseExceptionMessages = new ArrayList<>();
     rowContainer.set(row);
 
     for (int i = 0; i < aggs.length; i++) {
@@ -254,13 +254,12 @@
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
           log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
-          parseExceptionMessages.add(e.getMessage());
+          parseExceptionsHolder.add(e.getMessage());
         }
       }
     }
 
     rowContainer.set(null);
-    return parseExceptionMessages;
   }
 
   private void closeAggregators()
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java b/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
new file mode 100644
index 0000000..20faa2c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CircularBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * A handler for {@link ParseException}s thrown during ingestion. Based on the given configuration, this handler can
+ *
+ * - log ParseExceptions.
+ * - keep most recent N ParseExceptions in memory.
+ * - throw a RuntimeException when it sees more ParseExceptions than {@link #maxAllowedParseExceptions}.
+ *
+ * No matter what the handler does, the relevant metric should be updated first.
+ */
+public class ParseExceptionHandler
+{
+  private static final Logger LOG = new Logger(ParseExceptionHandler.class);
+
+  private final RowIngestionMeters rowIngestionMeters;
+  private final boolean logParseExceptions;
+  private final int maxAllowedParseExceptions;
+  @Nullable
+  private final CircularBuffer<ParseException> savedParseExceptions;
+
+  public ParseExceptionHandler(
+      RowIngestionMeters rowIngestionMeters,
+      boolean logParseExceptions,
+      int maxAllowedParseExceptions,
+      int maxSavedParseExceptions
+  )
+  {
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+    this.logParseExceptions = logParseExceptions;
+    this.maxAllowedParseExceptions = maxAllowedParseExceptions;
+    if (maxSavedParseExceptions > 0) {
+      this.savedParseExceptions = new CircularBuffer<>(maxSavedParseExceptions);
+    } else {
+      this.savedParseExceptions = null;
+    }
+  }
+
+  public void handle(@Nullable ParseException e)
+  {
+    if (e == null) {
+      return;
+    }
+    if (e.isFromPartiallyValidRow()) {
+      rowIngestionMeters.incrementProcessedWithError();
+    } else {
+      rowIngestionMeters.incrementUnparseable();
+    }
+
+    if (logParseExceptions) {
+      LOG.error(e, "Encountered parse exception");
+    }
+
+    if (savedParseExceptions != null) {
+      savedParseExceptions.add(e);
+    }
+
+    if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError() > maxAllowedParseExceptions) {
+      throw new RE("Max parse exceptions[%s] exceeded", maxAllowedParseExceptions);
+    }
+  }
+
+  @Nullable
+  public CircularBuffer<ParseException> getSavedParseExceptions()
+  {
+    return savedParseExceptions;
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
similarity index 97%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 725a5a2..efefca7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
 
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
similarity index 94%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
index 4035dbe..c395dcd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
 
 public interface RowIngestionMetersFactory
 {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
similarity index 97%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index 022548f..d9c6ce5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 8e0eb0a..a2e994e 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -147,7 +147,6 @@
                 .withRollup(withRollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 3399377..357a1e2 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -154,7 +154,6 @@
                 .withRollup(withRollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index b729e7d..4f9b1ff 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -141,7 +141,6 @@
                 .withRollup(withRollup)
                 .build()
         )
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index fc5535f..9d793c7 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -140,7 +140,6 @@
                 ))
                 .build()
         )
-        .setReportParseExceptions(false)
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
index b4525c0..ccc6ff7 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
@@ -125,7 +125,6 @@
                     )
                 ).build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(NUM_POINTS)
         .buildOnheap();
 
@@ -301,7 +300,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(1000)
           .buildOnheap();
 
@@ -329,7 +327,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(1000)
           .buildOnheap();
 
@@ -357,7 +354,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(NUM_POINTS)
           .buildOnheap();
 
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
index 9328904..a6207aa 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
@@ -133,7 +133,6 @@
                     )
                 ).build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(NUM_POINTS)
         .buildOnheap();
 
@@ -282,7 +281,6 @@
 
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(NUM_POINTS)
           .buildOnheap();
 
@@ -305,7 +303,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(NUM_POINTS)
           .buildOnheap();
 
@@ -329,7 +326,6 @@
 
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(NUM_POINTS)
           .buildOnheap();
 
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index 9d13c31..744b25d 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -137,7 +137,6 @@
                     )
                 ).build()
         )
-        .setReportParseExceptions(false)
         .setMaxRowCount(NUM_POINTS)
         .buildOnheap();
 
@@ -303,7 +302,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(1000)
           .buildOnheap();
 
@@ -330,7 +328,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(1000)
           .buildOnheap();
 
@@ -357,7 +354,6 @@
                       )
                   ).build()
           )
-          .setReportParseExceptions(false)
           .setMaxRowCount(NUM_POINTS)
           .buildOnheap();
 
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java
new file mode 100644
index 0000000..39c9402
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IncrementalIndexAddResultTest
+{
+  @Test
+  public void testIsRowAdded()
+  {
+    Assert.assertTrue(new IncrementalIndexAddResult(0, 0L).isRowAdded());
+    Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, "test").isRowAdded());
+    Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, new ParseException("test")).isRowAdded());
+  }
+
+  @Test
+  public void testHasParseException()
+  {
+    Assert.assertFalse(new IncrementalIndexAddResult(0, 0L).hasParseException());
+    Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, "test").hasParseException());
+    Assert.assertTrue(new IncrementalIndexAddResult(0, 0L, new ParseException("test")).hasParseException());
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 55dba3f..7b77839 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -36,7 +36,6 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.QueryPlus;
@@ -112,7 +111,6 @@
     public MapIncrementalIndex(
         IncrementalIndexSchema incrementalIndexSchema,
         boolean deserializeComplexMetrics,
-        boolean reportParseExceptions,
         boolean concurrentEventAdd,
         boolean sortFacts,
         int maxRowCount,
@@ -122,7 +120,6 @@
       super(
           incrementalIndexSchema,
           deserializeComplexMetrics,
-          reportParseExceptions,
           concurrentEventAdd,
           sortFacts,
           maxRowCount,
@@ -145,7 +142,6 @@
               .withMetrics(metrics)
               .build(),
           true,
-          true,
           false,
           true,
           maxRowCount,
@@ -222,15 +218,7 @@
 
       for (Aggregator agg : aggs) {
         synchronized (agg) {
-          try {
-            agg.aggregate();
-          }
-          catch (ParseException e) {
-            // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-            if (getReportParseExceptions()) {
-              throw e;
-            }
-          }
+          agg.aggregate();
         }
       }
 
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java
new file mode 100644
index 0000000..713aa7f
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.core.LogEvent;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+public class ParseExceptionHandlerTest
+{
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Rule
+  public LoggerCaptureRule logger = new LoggerCaptureRule(ParseExceptionHandler.class);
+
+  @Test
+  public void testMetricWhenAllConfigurationsAreTurnedOff()
+  {
+    final ParseException parseException = new ParseException("test");
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        false,
+        Integer.MAX_VALUE,
+        0
+    );
+
+    IntStream.range(0, 100).forEach(i -> {
+      parseExceptionHandler.handle(parseException);
+      Assert.assertEquals(i + 1, rowIngestionMeters.getUnparseable());
+    });
+  }
+
+  @Test
+  public void testLogParseExceptions()
+  {
+    final ParseException parseException = new ParseException("test");
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        true,
+        Integer.MAX_VALUE,
+        0
+    );
+    parseExceptionHandler.handle(parseException);
+
+    List<LogEvent> logEvents = logger.getLogEvents();
+    Assert.assertEquals(1, logEvents.size());
+    String logMessage = logEvents.get(0).getMessage().getFormattedMessage();
+    Assert.assertTrue(logMessage.contains("Encountered parse exception"));
+  }
+
+  @Test
+  public void testGetSavedParseExceptionsReturnNullWhenMaxSavedParseExceptionsIsZero()
+  {
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        new SimpleRowIngestionMeters(),
+        false,
+        Integer.MAX_VALUE,
+        0
+    );
+    Assert.assertNull(parseExceptionHandler.getSavedParseExceptions());
+  }
+
+  @Test
+  public void testMaxAllowedParseExceptionsThrowExceptionWhenItHitsMax()
+  {
+    final ParseException parseException = new ParseException("test");
+    final int maxAllowedParseExceptions = 3;
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        false,
+        maxAllowedParseExceptions,
+        0
+    );
+
+    IntStream.range(0, maxAllowedParseExceptions).forEach(i -> parseExceptionHandler.handle(parseException));
+    Assert.assertEquals(3, rowIngestionMeters.getUnparseable());
+
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("Max parse exceptions[3] exceeded");
+    try {
+      parseExceptionHandler.handle(parseException);
+    }
+    catch (RuntimeException e) {
+      Assert.assertEquals(4, rowIngestionMeters.getUnparseable());
+      throw e;
+    }
+  }
+
+  @Test
+  public void testGetSavedParseExceptionsReturnMostRecentParseExceptions()
+  {
+    final int maxSavedParseExceptions = 3;
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        rowIngestionMeters,
+        false,
+        Integer.MAX_VALUE,
+        maxSavedParseExceptions
+    );
+    Assert.assertNotNull(parseExceptionHandler.getSavedParseExceptions());
+    int exceptionCounter = 0;
+    for (; exceptionCounter < maxSavedParseExceptions; exceptionCounter++) {
+      parseExceptionHandler.handle(new ParseException(StringUtils.format("test %d", exceptionCounter)));
+    }
+    Assert.assertEquals(3, rowIngestionMeters.getUnparseable());
+    Assert.assertEquals(maxSavedParseExceptions, parseExceptionHandler.getSavedParseExceptions().size());
+    for (int i = 0; i < maxSavedParseExceptions; i++) {
+      Assert.assertEquals(
+          StringUtils.format("test %d", i),
+          parseExceptionHandler.getSavedParseExceptions().get(i).getMessage()
+      );
+    }
+    for (; exceptionCounter < 5; exceptionCounter++) {
+      parseExceptionHandler.handle(new ParseException(StringUtils.format("test %d", exceptionCounter)));
+    }
+    Assert.assertEquals(5, rowIngestionMeters.getUnparseable());
+
+    Assert.assertEquals(maxSavedParseExceptions, parseExceptionHandler.getSavedParseExceptions().size());
+    for (int i = 0; i < maxSavedParseExceptions; i++) {
+      Assert.assertEquals(
+          StringUtils.format("test %d", i + 2),
+          parseExceptionHandler.getSavedParseExceptions().get(i).getMessage()
+      );
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
new file mode 100644
index 0000000..3fd14e0
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Map;
+
+public class SimpleRowIngestionMeters implements RowIngestionMeters
+{
+  private long processed;
+  private long processedWithError;
+  private long unparseable;
+  private long thrownAway;
+
+  @Override
+  public long getProcessed()
+  {
+    return processed;
+  }
+
+  @Override
+  public void incrementProcessed()
+  {
+    processed++;
+  }
+
+  @Override
+  public long getProcessedWithError()
+  {
+    return processedWithError;
+  }
+
+  @Override
+  public void incrementProcessedWithError()
+  {
+    processedWithError++;
+  }
+
+  @Override
+  public long getUnparseable()
+  {
+    return unparseable;
+  }
+
+  @Override
+  public void incrementUnparseable()
+  {
+    unparseable++;
+  }
+
+  @Override
+  public long getThrownAway()
+  {
+    return thrownAway;
+  }
+
+  @Override
+  public void incrementThrownAway()
+  {
+    thrownAway++;
+  }
+
+  @Override
+  public RowIngestionMetersTotals getTotals()
+  {
+    return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable);
+  }
+
+  @Override
+  public Map<String, Object> getMovingAverages()
+  {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
index 4657158c..f0508df 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
@@ -80,8 +80,7 @@
   {
     return Collections.singletonList(
         MapInputRowParser.parse(
-            inputRowSchema.getTimestampSpec(),
-            inputRowSchema.getDimensionsSpec(),
+            inputRowSchema,
             intermediateRow
         )
     );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 56c3a5d..14796df 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -24,7 +24,6 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 
@@ -227,20 +226,15 @@
     private final int numRowsInSegment;
     private final boolean isPersistRequired;
 
-    @Nullable
-    private final ParseException parseException;
-
     AppenderatorAddResult(
         SegmentIdWithShardSpec identifier,
         int numRowsInSegment,
-        boolean isPersistRequired,
-        @Nullable ParseException parseException
+        boolean isPersistRequired
     )
     {
       this.segmentIdentifier = identifier;
       this.numRowsInSegment = numRowsInSegment;
       this.isPersistRequired = isPersistRequired;
-      this.parseException = parseException;
     }
 
     SegmentIdWithShardSpec getSegmentIdentifier()
@@ -257,11 +251,5 @@
     {
       return isPersistRequired;
     }
-
-    @Nullable
-    public ParseException getParseException()
-    {
-      return parseException;
-    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
index 62ee322..92641e3 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
@@ -19,8 +19,6 @@
 
 package org.apache.druid.segment.realtime.appenderator;
 
-import org.apache.druid.java.util.common.parsers.ParseException;
-
 import javax.annotation.Nullable;
 
 /**
@@ -35,44 +33,37 @@
   private final long totalNumRowsInAppenderator;
   private final boolean isPersistRequired;
 
-  @Nullable
-  private final ParseException parseException;
-
   public static AppenderatorDriverAddResult ok(
       SegmentIdWithShardSpec segmentIdentifier,
       int numRowsInSegment,
       long totalNumRowsInAppenderator,
-      boolean isPersistRequired,
-      @Nullable ParseException parseException
+      boolean isPersistRequired
   )
   {
     return new AppenderatorDriverAddResult(
         segmentIdentifier,
         numRowsInSegment,
         totalNumRowsInAppenderator,
-        isPersistRequired,
-        parseException
+        isPersistRequired
     );
   }
 
   public static AppenderatorDriverAddResult fail()
   {
-    return new AppenderatorDriverAddResult(null, 0, 0, false, null);
+    return new AppenderatorDriverAddResult(null, 0, 0, false);
   }
 
   private AppenderatorDriverAddResult(
       @Nullable SegmentIdWithShardSpec segmentIdentifier,
       int numRowsInSegment,
       long totalNumRowsInAppenderator,
-      boolean isPersistRequired,
-      @Nullable ParseException parseException
+      boolean isPersistRequired
   )
   {
     this.segmentIdentifier = segmentIdentifier;
     this.numRowsInSegment = numRowsInSegment;
     this.totalNumRowsInAppenderator = totalNumRowsInAppenderator;
     this.isPersistRequired = isPersistRequired;
-    this.parseException = parseException;
   }
 
   public boolean isOk()
@@ -111,10 +102,4 @@
     }
     return overThreshold;
   }
-
-  @Nullable
-  public ParseException getParseException()
-  {
-    return parseException;
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 62a5ba9..c65c2ad 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -61,6 +61,8 @@
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -130,6 +132,8 @@
   private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
   private final AtomicInteger totalRows = new AtomicInteger();
   private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
   // Synchronize persisting commitMetadata so that multiple persist threads (if present)
   // and abandon threads do not step over each other
   private final Lock commitLock = new ReentrantLock();
@@ -168,7 +172,9 @@
       @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
       IndexIO indexIO,
       IndexMerger indexMerger,
-      Cache cache
+      Cache cache,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     this.myId = id;
@@ -182,6 +188,8 @@
     this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
     this.cache = cache;
     this.texasRanger = sinkQuerySegmentWalker;
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+    this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
 
     if (sinkQuerySegmentWalker == null) {
       this.sinkTimeline = new VersionedIntervalTimeline<>(
@@ -267,6 +275,12 @@
       throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
     }
 
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }
+
     final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
     rowsCurrentlyInMemory.addAndGet(numAddedRows);
     bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd);
@@ -329,7 +343,7 @@
         isPersistRequired = true;
       }
     }
-    return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired, addResult.getParseException());
+    return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
   }
 
   @Override
@@ -392,7 +406,6 @@
           identifier.getVersion(),
           tuningConfig.getMaxRowsInMemory(),
           maxBytesTuningConfig,
-          tuningConfig.isReportParseExceptions(),
           null
       );
 
@@ -1110,7 +1123,6 @@
             identifier.getVersion(),
             tuningConfig.getMaxRowsInMemory(),
             maxBytesTuningConfig,
-            tuningConfig.isReportParseExceptions(),
             null,
             hydrants
         );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
index 28c3ac7..2aa5d43 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
@@ -159,7 +159,7 @@
     try {
       final Appenderator.AppenderatorAddResult addResult = appenderator.add(identifier, row, committerSupplier);
       lastCommitterSupplier = committerSupplier;
-      return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0, addResult.getParseException());
+      return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0);
     }
     catch (SegmentNotWritableException e) {
       // Segment already started handoff
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 7ddb409..c59e4e0 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -28,12 +28,14 @@
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
-import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 
 import java.util.concurrent.ExecutorService;
@@ -56,7 +58,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     return new AppenderatorImpl(
@@ -83,7 +87,9 @@
         ),
         indexIO,
         indexMerger,
-        cache
+        cache,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 
@@ -95,7 +101,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     return new AppenderatorImpl(
@@ -105,36 +113,13 @@
         metrics,
         dataSegmentPusher,
         objectMapper,
-        new DataSegmentAnnouncer()
-        {
-          @Override
-          public void announceSegment(DataSegment segment)
-          {
-            // Do nothing
-          }
-
-          @Override
-          public void unannounceSegment(DataSegment segment)
-          {
-            // Do nothing
-          }
-
-          @Override
-          public void announceSegments(Iterable<DataSegment> segments)
-          {
-            // Do nothing
-          }
-
-          @Override
-          public void unannounceSegments(Iterable<DataSegment> segments)
-          {
-            // Do nothing
-          }
-        },
+        new NoopDataSegmentAnnouncer(),
         null,
         indexIO,
         indexMerger,
-        null
+        null,
+        rowIngestionMeters,
+        parseExceptionHandler
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index c0d4b9c..76c64d2 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -30,6 +30,8 @@
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -78,7 +80,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   );
 
   /**
@@ -92,7 +96,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   );
 
   /**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 5ecefeb..dc16d59 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -413,8 +413,7 @@
             identifier,
             result.getNumRowsInSegment(),
             appenderator.getTotalRowCount(),
-            result.isPersistRequired(),
-            result.getParseException()
+            result.isPersistRequired()
         );
       }
       catch (SegmentNotWritableException e) {
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
index ed53c2c..7a0f1dc 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
@@ -24,6 +24,9 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -54,6 +57,7 @@
   @Override
   public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics)
   {
+    final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
     return Appenderators.createOffline(
         schema.getDataSource(),
         schema,
@@ -62,7 +66,14 @@
         dataSegmentPusher,
         objectMapper,
         indexIO,
-        indexMerger
+        indexMerger,
+        rowIngestionMeters,
+        new ParseExceptionHandler(
+            rowIngestionMeters,
+            false,
+            config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
+            0
+        )
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 969ed32..6ee6b4b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -30,6 +30,9 @@
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.join.JoinableFactory;
@@ -92,6 +95,7 @@
       final FireDepartmentMetrics metrics
   )
   {
+    final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
     return Appenderators.createRealtime(
         schema.getDataSource(),
         schema,
@@ -114,7 +118,14 @@
         joinableFactory,
         cache,
         cacheConfig,
-        cachePopulatorStats
+        cachePopulatorStats,
+        rowIngestionMeters,
+        new ParseExceptionHandler(
+            rowIngestionMeters,
+            false,
+            config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
+            0
+        )
     );
   }
 
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 66ce98b..87de244 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -31,6 +31,8 @@
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -69,7 +71,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     throw new UOE(ERROR_MSG);
@@ -84,7 +88,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     throw new UOE(ERROR_MSG);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index be21051..7fa4f4c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -31,6 +31,8 @@
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -75,7 +77,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     if (realtimeAppenderator != null) {
@@ -99,7 +103,9 @@
           joinableFactory,
           cache,
           cacheConfig,
-          cachePopulatorStats
+          cachePopulatorStats,
+          rowIngestionMeters,
+          parseExceptionHandler
       );
     }
     return realtimeAppenderator;
@@ -114,7 +120,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@@ -129,7 +137,9 @@
           dataSegmentPusher,
           objectMapper,
           indexIO,
-          indexMerger
+          indexMerger,
+          rowIngestionMeters,
+          parseExceptionHandler
       );
       return batchAppenderator;
     }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 885067a..7fae74c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -50,6 +50,8 @@
 import org.apache.druid.segment.ProgressIndicator;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -158,7 +160,9 @@
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
-      CachePopulatorStats cachePopulatorStats
+      CachePopulatorStats cachePopulatorStats,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     synchronized (this) {
@@ -178,7 +182,9 @@
           datasourceBundle.getWalker(),
           indexIO,
           wrapIndexMerger(indexMerger),
-          cache
+          cache,
+          rowIngestionMeters,
+          parseExceptionHandler
       );
 
       datasourceBundle.addAppenderator(taskId, appenderator);
@@ -195,7 +201,9 @@
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
       IndexIO indexIO,
-      IndexMerger indexMerger
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     synchronized (this) {
@@ -212,7 +220,9 @@
           dataSegmentPusher,
           objectMapper,
           indexIO,
-          wrapIndexMerger(indexMerger)
+          wrapIndexMerger(indexMerger),
+          rowIngestionMeters,
+          parseExceptionHandler
       );
       datasourceBundle.addAppenderator(taskId, appenderator);
       return appenderator;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
index d81f485..e7fab4a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
@@ -29,9 +29,9 @@
 
 public interface Plumber
 {
-  IncrementalIndexAddResult THROWAWAY = new IncrementalIndexAddResult(-1, -1, null, "row too late");
-  IncrementalIndexAddResult NOT_WRITABLE = new IncrementalIndexAddResult(-1, -1, null, "not writable");
-  IncrementalIndexAddResult DUPLICATE = new IncrementalIndexAddResult(-2, -1, null, "duplicate row");
+  IncrementalIndexAddResult THROWAWAY = new IncrementalIndexAddResult(-1, -1, "row too late");
+  IncrementalIndexAddResult NOT_WRITABLE = new IncrementalIndexAddResult(-1, -1, "not writable");
+  IncrementalIndexAddResult DUPLICATE = new IncrementalIndexAddResult(-2, -1, "duplicate row");
 
   /**
    * Perform any initial setup. Should be called before using any other methods, and should be paired
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index 6210754..bae600c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -116,7 +116,6 @@
       String.CASE_INSENSITIVE_ORDER
   );
   private final QuerySegmentWalker texasRanger;
-
   private final Cache cache;
 
   private volatile long nextFlush = 0;
@@ -263,7 +262,6 @@
           versioningPolicy.getVersion(sinkInterval),
           config.getMaxRowsInMemory(),
           TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
-          config.isReportParseExceptions(),
           config.getDedupColumn()
       );
       addSink(retVal);
@@ -727,7 +725,6 @@
           versioningPolicy.getVersion(sinkInterval),
           config.getMaxRowsInMemory(),
           TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
-          config.isReportParseExceptions(),
           config.getDedupColumn(),
           hydrants
       );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index e7e6c33..d6cb223 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -56,7 +56,7 @@
 public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
 {
   private static final IncrementalIndexAddResult ALREADY_SWAPPED =
-      new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
+      new IncrementalIndexAddResult(-1, -1, "write after index swapped");
 
   private final Object hydrantLock = new Object();
   private final Interval interval;
@@ -65,15 +65,15 @@
   private final String version;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
-  private final boolean reportParseExceptions;
   private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
   private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
   private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
-  private volatile FireHydrant currHydrant;
-  private volatile boolean writable = true;
   private final String dedupColumn;
   private final Set<Long> dedupSet = new HashSet<>();
 
+  private volatile FireHydrant currHydrant;
+  private volatile boolean writable = true;
+
   public Sink(
       Interval interval,
       DataSchema schema,
@@ -81,7 +81,6 @@
       String version,
       int maxRowsInMemory,
       long maxBytesInMemory,
-      boolean reportParseExceptions,
       String dedupColumn
   )
   {
@@ -92,7 +91,6 @@
         version,
         maxRowsInMemory,
         maxBytesInMemory,
-        reportParseExceptions,
         dedupColumn,
         Collections.emptyList()
     );
@@ -105,7 +103,6 @@
       String version,
       int maxRowsInMemory,
       long maxBytesInMemory,
-      boolean reportParseExceptions,
       String dedupColumn,
       List<FireHydrant> hydrants
   )
@@ -116,7 +113,6 @@
     this.version = version;
     this.maxRowsInMemory = maxRowsInMemory;
     this.maxBytesInMemory = maxBytesInMemory;
-    this.reportParseExceptions = reportParseExceptions;
     this.dedupColumn = dedupColumn;
 
     int maxCount = -1;
@@ -328,7 +324,6 @@
         .build();
     final IncrementalIndex newIndex = new IncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
-        .setReportParseExceptions(reportParseExceptions)
         .setMaxRowCount(maxRowsInMemory)
         .setMaxBytesInMemory(maxBytesInMemory)
         .buildOnheap();
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index 607a3c9..19ee7ef 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -43,6 +43,8 @@
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.realtime.plumber.Committers;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -481,6 +483,23 @@
   }
 
   @Test
+  public void testVerifyRowIngestionMetrics() throws Exception
+  {
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    try (final AppenderatorTester tester = new AppenderatorTester(5, 1000L, null, false, rowIngestionMeters)) {
+      final Appenderator appenderator = tester.getAppenderator();
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
+      appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Committers.nilSupplier());
+
+      Assert.assertEquals(1, rowIngestionMeters.getProcessed());
+      Assert.assertEquals(1, rowIngestionMeters.getProcessedWithError());
+      Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
+      Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
+    }
+  }
+
+  @Test
   public void testQueryByIntervals() throws Exception
   {
     try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
@@ -792,7 +811,7 @@
     );
   }
 
-  static InputRow ir(String ts, String dim, long met)
+  static InputRow ir(String ts, String dim, Object met)
   {
     return new MapBasedInputRow(
         DateTimes.of(ts).getMillis(),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index eaa7957..9e57d28 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -54,6 +54,9 @@
 import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -61,7 +64,7 @@
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 
@@ -116,11 +119,22 @@
 
   public AppenderatorTester(
       final int maxRowsInMemory,
-      long maxSizeInBytes,
+      final long maxSizeInBytes,
       final File basePersistDirectory,
       final boolean enablePushFailure
   )
   {
+    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters());
+  }
+
+  public AppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      final File basePersistDirectory,
+      final boolean enablePushFailure,
+      final RowIngestionMeters rowIngestionMeters
+  )
+  {
     objectMapper = new DefaultObjectMapper();
     objectMapper.registerSubtypes(LinearShardSpec.class);
 
@@ -147,10 +161,9 @@
         null,
         objectMapper
     );
-    maxSizeInBytes = maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes;
     tuningConfig = new RealtimeTuningConfig(
         maxRowsInMemory,
-        maxSizeInBytes,
+        maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
         null,
         null,
         basePersistDirectory,
@@ -255,38 +268,15 @@
                 )
             )
         ),
-        new DataSegmentAnnouncer()
-        {
-          @Override
-          public void announceSegment(DataSegment segment)
-          {
-
-          }
-
-          @Override
-          public void unannounceSegment(DataSegment segment)
-          {
-
-          }
-
-          @Override
-          public void announceSegments(Iterable<DataSegment> segments)
-          {
-
-          }
-
-          @Override
-          public void unannounceSegments(Iterable<DataSegment> segments)
-          {
-
-          }
-        },
+        new NoopDataSegmentAnnouncer(),
         emitter,
         queryExecutor,
         NoopJoinableFactory.INSTANCE,
         MapCache.create(2048),
         new CacheConfig(),
-        new CachePopulatorStats()
+        new CachePopulatorStats(),
+        rowIngestionMeters,
+        new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0)
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 16d3c90..4f9cd3c 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -418,7 +418,7 @@
     {
       rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row);
       numRows++;
-      return new AppenderatorAddResult(identifier, numRows, false, null);
+      return new AppenderatorAddResult(identifier, numRows, false);
     }
 
     @Override
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index a98bb98..0eb62e8 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -280,7 +280,6 @@
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
         tuningConfig.getMaxRowsInMemory(),
         TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
         tuningConfig.getDedupColumn()
     );
     plumber.getSinks().put(0L, sink);
@@ -326,7 +325,6 @@
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
         tuningConfig.getMaxRowsInMemory(),
         TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
         tuningConfig.getDedupColumn()
     );
     plumber.getSinks().put(0L, sink);
@@ -377,7 +375,6 @@
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
         tuningConfig.getMaxRowsInMemory(),
         TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
         tuningConfig.getDedupColumn()
     );
     plumber2.getSinks().put(0L, sink);
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index df59fa7..bf94b2f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -93,7 +93,6 @@
         version,
         tuningConfig.getMaxRowsInMemory(),
         TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
         tuningConfig.getDedupColumn()
     );
 
@@ -248,7 +247,6 @@
         version,
         tuningConfig.getMaxRowsInMemory(),
         TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
         tuningConfig.getDedupColumn()
     );
 
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 19ed6bc..531746f 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -47,7 +47,6 @@
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -63,6 +62,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 8ef1eca..e08e5ea 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -60,7 +60,6 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -100,6 +99,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 914c7f1..6f7cacd 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -74,7 +74,6 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
@@ -94,6 +93,7 @@
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.lookup.LookupModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.loading.DataSegmentArchiver;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;