More structured way to handle parse exceptions (#10336)
* More structured way to handle parse exceptions
* checkstyle; add more tests
* forbidden api; test
* address comment; new test
* address review comments
* javadoc for parseException; remove redundant parseException in streaming ingestion
* fix tests
* unnecessary catch
* unused imports
* appenderator test
* unused import
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index 5efe00e..f7a690f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -229,7 +229,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index ef4ca7b..47b5317 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -226,7 +226,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(metrics)
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index d05da5e..1921b43 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -415,7 +415,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index 0da6c06..cc8d4a3 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -129,7 +129,6 @@
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggs)
.setDeserializeComplexMetrics(false)
- .setReportParseExceptions(false)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index bf7733d..fabb86a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -310,7 +310,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index 1a9ac01..a49e122 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -130,7 +130,6 @@
.withRollup(rollup)
.build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 02c6efd..8448358 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -111,7 +111,6 @@
.withRollup(rollup)
.build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment * 2)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 80e9643..b677a96 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -218,7 +218,6 @@
.withRollup(rollup)
.build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 5b1a0ca..755947d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -158,7 +158,6 @@
.withRollup(rollup)
.build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 67d6273..f212fe3 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -584,7 +584,6 @@
.withRollup(withRollup)
.build()
)
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index 410b13e..b543076 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -321,7 +321,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index d8ae55a..680a179 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -388,7 +388,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index 931af6c..875192a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -312,7 +312,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index bbafeff..b290f30 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -294,7 +294,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index 3272ecd..406e627 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -406,7 +406,6 @@
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index f7c3443..8436ad8 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -21,15 +21,17 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -67,7 +69,7 @@
return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap);
}
- public static InputRow parse(
+ private static InputRow parse(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
Map<String, Object> theMap
@@ -76,7 +78,8 @@
return parse(timestampSpec, dimensionsSpec.getDimensionNames(), dimensionsSpec.getDimensionExclusions(), theMap);
}
- public static InputRow parse(
+ @VisibleForTesting
+ static InputRow parse(
TimestampSpec timestampSpec,
List<String> dimensions,
Set<String> dimensionExclusions,
@@ -93,23 +96,42 @@
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
- if (timestamp == null) {
- final String input = theMap.toString();
- throw new NullPointerException(
- StringUtils.format(
- "Null timestamp in input: %s",
- input.length() < 100 ? input : input.substring(0, 100) + "..."
- )
- );
- }
}
catch (Exception e) {
- throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+ throw new ParseException(
+ e,
+ "Timestamp[%s] is unparseable! Event: %s",
+ timestampSpec.getRawTimestamp(theMap),
+ rawMapToPrint(theMap)
+ );
}
-
+ if (timestamp == null) {
+ throw new ParseException(
+ "Timestamp[%s] is unparseable! Event: %s",
+ timestampSpec.getRawTimestamp(theMap),
+ rawMapToPrint(theMap)
+ );
+ }
+ if (!Intervals.ETERNITY.contains(timestamp)) {
+ throw new ParseException(
+ "Encountered row with timestamp[%s] that cannot be represented as a long: [%s]",
+ timestamp,
+ rawMapToPrint(theMap)
+ );
+ }
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
}
+ @Nullable
+ private static String rawMapToPrint(@Nullable Map<String, Object> rawMap)
+ {
+ if (rawMap == null) {
+ return null;
+ }
+ final String input = rawMap.toString();
+ return input.length() < 100 ? input : input.substring(0, 100) + "...";
+ }
+
@JsonProperty
@Override
public ParseSpec getParseSpec()
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
index 08db775..cdd6ebd 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
@@ -90,12 +90,20 @@
return missingValue;
}
- public DateTime extractTimestamp(Map<String, Object> input)
+ @Nullable
+ public DateTime extractTimestamp(@Nullable Map<String, Object> input)
{
- return parseDateTime(input.get(timestampColumn));
+ return parseDateTime(getRawTimestamp(input));
}
- public DateTime parseDateTime(Object input)
+ @Nullable
+ public Object getRawTimestamp(@Nullable Map<String, Object> input)
+ {
+ return input == null ? null : input.get(timestampColumn);
+ }
+
+ @Nullable
+ public DateTime parseDateTime(@Nullable Object input)
{
DateTime extracted = missingValue;
if (input != null) {
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
index e71121e..01d118e 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ParseException.java
@@ -22,28 +22,42 @@
import org.apache.druid.java.util.common.StringUtils;
/**
+ * ParseException can be thrown on both ingestion side and query side.
+ *
+ * During ingestion, ParseException can be thrown in two places, i.e., {@code InputSourceReader#read()}
+ * and {@code IncrementalIndex#addToFacts()}. To easily handle ParseExceptions, consider using
+ * {@code FilteringCloseableInputRowIterator} and {@code ParseExceptionHandler} to iterate input rows and
+ * to add rows to IncrementalIndex, respectively.
+ *
+ * When you use {@code InputSourceReader#sample()}, the ParseException will not be thrown, but be stored in
+ * {@code InputRowListPlusRawValues}.
+ *
+ * During query, ParseException can be thrown in SQL planning. It should be never thrown once a query plan is
+ * constructed.
*/
public class ParseException extends RuntimeException
{
- private boolean fromPartiallyValidRow = false;
+ private final boolean fromPartiallyValidRow;
public ParseException(String formatText, Object... arguments)
{
super(StringUtils.nonStrictFormat(formatText, arguments));
+ this.fromPartiallyValidRow = false;
+ }
+
+ public ParseException(boolean fromPartiallyValidRow, String formatText, Object... arguments)
+ {
+ super(StringUtils.nonStrictFormat(formatText, arguments));
+ this.fromPartiallyValidRow = fromPartiallyValidRow;
}
public ParseException(Throwable cause, String formatText, Object... arguments)
{
- super(StringUtils.nonStrictFormat(formatText, arguments), cause);
+ this(false, StringUtils.nonStrictFormat(formatText, arguments), cause);
}
public boolean isFromPartiallyValidRow()
{
return fromPartiallyValidRow;
}
-
- public void setFromPartiallyValidRow(boolean fromPartiallyValidRow)
- {
- this.fromPartiallyValidRow = fromPartiallyValidRow;
- }
}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java b/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java
new file mode 100644
index 0000000..6a7d11b
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/MapInputRowParserTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.List;
+import java.util.Set;
+
+public class MapInputRowParserTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private final TimestampSpec timestampSpec = new TimestampSpec("time", null, null);
+ private final List<String> dimensions = ImmutableList.of("dim");
+ private final Set<String> dimensionExclusions = ImmutableSet.of();
+
+ @Test
+ public void testParseValidInput()
+ {
+ final InputRow inputRow = MapInputRowParser.parse(
+ timestampSpec,
+ dimensions,
+ dimensionExclusions,
+ ImmutableMap.of("time", "2020-01-01", "dim", 0, "met", 10)
+ );
+ Assert.assertEquals(dimensions, inputRow.getDimensions());
+ Assert.assertEquals(DateTimes.of("2020-01-01"), inputRow.getTimestamp());
+ Assert.assertEquals(ImmutableList.of("0"), inputRow.getDimension("dim"));
+ Assert.assertEquals(10, inputRow.getMetric("met"));
+ }
+
+ @Test
+ public void testParseInvalidTimestampThrowParseException()
+ {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("Timestamp[invalid timestamp] is unparseable!");
+ final InputRow inputRow = MapInputRowParser.parse(
+ timestampSpec,
+ dimensions,
+ dimensionExclusions,
+ ImmutableMap.of("time", "invalid timestamp", "dim", 0, "met", 10)
+ );
+ }
+
+ @Test
+ public void testParseMissingTimestampThrowParseException()
+ {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("Timestamp[null] is unparseable!");
+ final InputRow inputRow = MapInputRowParser.parse(
+ timestampSpec,
+ dimensions,
+ dimensionExclusions,
+ ImmutableMap.of("dim", 0, "met", 10)
+ );
+ }
+
+ @Test
+ public void testParseTimestampSmallerThanMinThrowParseException()
+ {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("Encountered row with timestamp[-146136543-09-08T08:23:32.095Z] that cannot be represented as a long");
+ MapInputRowParser.parse(
+ timestampSpec,
+ dimensions,
+ dimensionExclusions,
+ ImmutableMap.of("time", DateTimes.utc(JodaUtils.MIN_INSTANT - 1), "dim", 0, "met", 10)
+ );
+ }
+
+ @Test
+ public void testParseTimestampLargerThanMaxThrowParseException()
+ {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("Encountered row with timestamp[146140482-04-24T15:36:27.904Z] that cannot be represented as a long");
+ MapInputRowParser.parse(
+ timestampSpec,
+ dimensions,
+ dimensionExclusions,
+ ImmutableMap.of("time", DateTimes.utc(JodaUtils.MAX_INSTANT + 1), "dim", 0, "met", 10)
+ );
+ }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 39e82c1..cf69872 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -115,8 +115,7 @@
{
return Collections.singletonList(
MapInputRowParser.parse(
- inputRowSchema.getTimestampSpec(),
- inputRowSchema.getDimensionsSpec(),
+ inputRowSchema,
recordFlattener.flatten(intermediateRow)
)
);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index f02b7c4..b65910e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -36,7 +36,6 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
@@ -66,7 +65,6 @@
KafkaIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
AuthorizerMapper authorizerMapper,
- CircularBuffer<Throwable> savedParseExceptions,
LockGranularity lockGranularityToUse
)
{
@@ -74,7 +72,6 @@
task,
parser,
authorizerMapper,
- savedParseExceptions,
lockGranularityToUse
);
this.task = task;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 48b2846..fb06353 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -128,7 +128,6 @@
this,
dataSchema.getParser(),
authorizerMapper,
- savedParseExceptions,
lockGranularityToUse
);
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 1a72ef4..021d916 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -53,6 +52,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 5d1f4bf..55f69e1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,7 +24,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -33,6 +32,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a69dbec..f4098fc 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -53,9 +53,6 @@
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@@ -106,6 +103,9 @@
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -1362,7 +1362,7 @@
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long,]",
"Unable to parse row [unparseable2]",
"Unable to parse row [unparseable]",
- "Encountered row with timestamp that cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index e8f1e65..3768ceb 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -245,7 +245,7 @@
.build(),
null,
true,
- "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 40ec97a..7874a11 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -33,6 +32,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.Assert;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index b72461d..905abd1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -38,7 +38,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
@@ -77,6 +76,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 753363d..0fe5701 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -70,7 +70,6 @@
this,
dataSchema.getParser(),
authorizerMapper,
- savedParseExceptions,
lockGranularityToUse
);
}
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index ca35b89..2c65cd0 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -36,7 +36,6 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.utils.CircularBuffer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -60,7 +59,6 @@
KinesisIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
AuthorizerMapper authorizerMapper,
- CircularBuffer<Throwable> savedParseExceptions,
LockGranularity lockGranularityToUse
)
{
@@ -68,7 +66,6 @@
task,
parser,
authorizerMapper,
- savedParseExceptions,
lockGranularityToUse
);
this.task = task;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index d21e2fb..ba5129a 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,7 +25,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@@ -53,6 +52,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
import java.util.ArrayList;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 3092d3a..f210ca6 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -26,7 +26,6 @@
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -36,6 +35,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index f754656..9ab058c 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -26,12 +26,12 @@
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 8a05464..4aff50b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -57,8 +57,6 @@
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -98,12 +96,12 @@
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -179,9 +177,7 @@
private final Period intermediateHandoffPeriod = null;
private int maxRecordsPerPoll;
- private AppenderatorsManager appenderatorsManager;
private final Set<Integer> checkpointRequestsHash = new HashSet<>();
- private RowIngestionMetersFactory rowIngestionMetersFactory;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@@ -226,8 +222,6 @@
recordSupplier = mock(KinesisRecordSupplier.class);
- appenderatorsManager = new TestAppenderatorsManager();
-
// sleep required because of kinesalite
Thread.sleep(500);
makeToolboxFactory();
@@ -1336,10 +1330,10 @@
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1],]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float,]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long,]",
- "Unparseable timestamp found! Event: {}",
+ "Timestamp[null] is unparseable! Event: {}",
"Unable to parse row [unparseable2]",
"Unable to parse row [unparseable]",
- "Encountered row with timestamp that cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
);
@@ -2838,7 +2832,6 @@
{
directory = tempFolder.newFolder();
final TestUtils testUtils = new TestUtils();
- rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
objectMapper.setInjectableValues(((InjectableValues.Std) objectMapper.getInjectableValues()).addValue(
AWSCredentialsConfig.class,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 1b2c07a..dadc6c7 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -262,7 +262,7 @@
.build(),
null,
true,
- "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
null,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 7ca3b04..1351662 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -36,7 +36,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@@ -76,6 +75,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index e9b6817..421bbed 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -147,8 +147,7 @@
{
return Collections.singletonList(
MapInputRowParser.parse(
- inputRowSchema.getTimestampSpec(),
- inputRowSchema.getDimensionsSpec(),
+ inputRowSchema,
orcStructFlattener.flatten(intermediateRow)
)
);
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 1bdd011..2d2d87d 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -136,8 +136,7 @@
{
return Collections.singletonList(
MapInputRowParser.parse(
- inputRowSchema.getTimestampSpec(),
- inputRowSchema.getDimensionsSpec(),
+ inputRowSchema,
flattener.flatten(intermediateRow)
)
);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index e2acce7..91982fd 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -304,7 +304,6 @@
IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
- .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
.buildOnheap();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
index f255a33..03452ac 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
@@ -21,10 +21,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 009a407..5f6f392 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -39,7 +39,6 @@
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -50,6 +49,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 3f8063f..8c883a4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -37,7 +37,6 @@
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
@@ -48,6 +47,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
index f47a8f4..318f1c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
@@ -21,17 +21,18 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import java.util.HashMap;
import java.util.Map;
public class DropwizardRowIngestionMeters implements RowIngestionMeters
{
- public static final String ONE_MINUTE_NAME = "1m";
- public static final String FIVE_MINUTE_NAME = "5m";
- public static final String FIFTEEN_MINUTE_NAME = "15m";
+ private static final String ONE_MINUTE_NAME = "1m";
+ private static final String FIVE_MINUTE_NAME = "5m";
+ private static final String FIFTEEN_MINUTE_NAME = "15m";
- private final MetricRegistry metricRegistry;
private final Meter processed;
private final Meter processedWithError;
private final Meter unparseable;
@@ -39,7 +40,7 @@
public DropwizardRowIngestionMeters()
{
- this.metricRegistry = new MetricRegistry();
+ MetricRegistry metricRegistry = new MetricRegistry();
this.processed = metricRegistry.meter(PROCESSED);
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
this.unparseable = metricRegistry.meter(UNPARSEABLE);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
index d6b9928..46b1e12 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java
@@ -19,6 +19,9 @@
package org.apache.druid.indexing.common.stats;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+
public class DropwizardRowIngestionMetersFactory implements RowIngestionMetersFactory
{
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index ed1cff0..070d21b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -26,6 +26,8 @@
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index eefda5a..f8596ec 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -19,10 +19,16 @@
package org.apache.druid.indexing.common.task;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
@@ -41,6 +47,10 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -52,17 +62,21 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -136,6 +150,55 @@
}
/**
+ * Returns an {@link InputRow} iterator which iterates over an input source.
+ * The returned iterator filters out rows which don't satisfy the given filter or cannot be parsed properly.
+ * The returned iterator can throw {@link org.apache.druid.java.util.common.parsers.ParseException}s in
+ * {@link Iterator#hasNext()} when it hits {@link ParseExceptionHandler#maxAllowedParseExceptions}.
+ */
+ public static FilteringCloseableInputRowIterator inputSourceReader(
+ File tmpDir,
+ DataSchema dataSchema,
+ InputSource inputSource,
+ @Nullable InputFormat inputFormat,
+ Predicate<InputRow> rowFilter,
+ RowIngestionMeters ingestionMeters,
+ ParseExceptionHandler parseExceptionHandler
+ ) throws IOException
+ {
+ final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
+ .map(AggregatorFactory::getName)
+ .collect(Collectors.toList());
+ final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
+ inputSource.reader(
+ new InputRowSchema(
+ dataSchema.getTimestampSpec(),
+ dataSchema.getDimensionsSpec(),
+ metricsNames
+ ),
+ inputFormat,
+ tmpDir
+ )
+ );
+ return new FilteringCloseableInputRowIterator(
+ inputSourceReader.read(),
+ rowFilter,
+ ingestionMeters,
+ parseExceptionHandler
+ );
+ }
+
+ protected static Predicate<InputRow> defaultRowFilter(GranularitySpec granularitySpec)
+ {
+ return inputRow -> {
+ if (inputRow == null) {
+ return false;
+ }
+ final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+ return optInterval.isPresent();
+ };
+ }
+
+ /**
* Registers a resource cleaner which is executed on abnormal exits.
*
* @see Task#stopGracefully
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 586abe0..064d2ca 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -57,7 +57,6 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@@ -69,6 +68,8 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
@@ -86,7 +87,7 @@
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.apache.druid.utils.CircularBuffer;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
@@ -151,21 +152,26 @@
private volatile Thread runThread = null;
@JsonIgnore
- private CircularBuffer<Throwable> savedParseExceptions;
-
- @JsonIgnore
private final LockGranularity lockGranularity;
@JsonIgnore
+ @MonotonicNonNull
+ private ParseExceptionHandler parseExceptionHandler;
+
+ @JsonIgnore
+ @MonotonicNonNull
private IngestionState ingestionState;
@JsonIgnore
+ @MonotonicNonNull
private AuthorizerMapper authorizerMapper;
@JsonIgnore
+ @MonotonicNonNull
private RowIngestionMeters rowIngestionMeters;
@JsonIgnore
+ @MonotonicNonNull
private String errorMsg;
@@ -187,10 +193,6 @@
this.spec = spec;
this.pendingHandoffs = new ConcurrentLinkedQueue<>();
- if (spec.getTuningConfig().getMaxSavedParseExceptions() > 0) {
- savedParseExceptions = new CircularBuffer<>(spec.getTuningConfig().getMaxSavedParseExceptions());
- }
-
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularity = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
? LockGranularity.TIME_CHUNK
@@ -244,6 +246,12 @@
runThread = Thread.currentThread();
authorizerMapper = toolbox.getAuthorizerMapper();
rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ spec.getTuningConfig().isLogParseExceptions(),
+ spec.getTuningConfig().getMaxParseExceptions(),
+ spec.getTuningConfig().getMaxSavedParseExceptions()
+ );
setupTimeoutAlert();
@@ -371,12 +379,6 @@
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp());
}
-
- if (addResult.getParseException() != null) {
- handleParseException(addResult.getParseException());
- } else {
- rowIngestionMeters.incrementProcessed();
- }
}
}
catch (ParseException e) {
@@ -550,7 +552,9 @@
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
- List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+ List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ parseExceptionHandler.getSavedParseExceptions()
+ );
return Response.ok(events).build();
}
@@ -590,7 +594,8 @@
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
- savedParseExceptions);
+ parseExceptionHandler.getSavedParseExceptions()
+ );
if (buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
}
@@ -619,8 +624,8 @@
log.error(pe, "Encountered parse exception");
}
- if (savedParseExceptions != null) {
- savedParseExceptions.add(pe);
+ if (parseExceptionHandler.getSavedParseExceptions() != null) {
+ parseExceptionHandler.getSavedParseExceptions().add(pe);
}
if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
@@ -760,7 +765,9 @@
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
- toolbox.getCachePopulatorStats()
+ toolbox.getCachePopulatorStats(),
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index c6b756a..711f9d4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -21,6 +21,8 @@
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -38,7 +40,9 @@
FireDepartmentMetrics metrics,
TaskToolbox toolbox,
DataSchema dataSchema,
- AppenderatorConfig appenderatorConfig
+ AppenderatorConfig appenderatorConfig,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
return newAppenderator(
@@ -48,7 +52,9 @@
toolbox,
dataSchema,
appenderatorConfig,
- toolbox.getSegmentPusher()
+ toolbox.getSegmentPusher(),
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
@@ -59,7 +65,9 @@
TaskToolbox toolbox,
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
- DataSegmentPusher segmentPusher
+ DataSegmentPusher segmentPusher,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
return appenderatorsManager.createOfflineAppenderatorForTask(
@@ -70,7 +78,9 @@
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
- toolbox.getIndexMergerV9()
+ toolbox.getIndexMergerV9(),
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
new file mode 100644
index 0000000..6a88a34
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+
+/**
+ * An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter out rows which do not satisfy the given
+ * {@link #filter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
+ * it filters out rows based on the filter. ParseException handling is delegatged to {@link ParseExceptionHandler}.
+ */
+public class FilteringCloseableInputRowIterator implements CloseableIterator<InputRow>
+{
+ private final CloseableIterator<InputRow> delegate;
+ private final Predicate<InputRow> filter;
+ private final RowIngestionMeters rowIngestionMeters;
+ private final ParseExceptionHandler parseExceptionHandler;
+
+ private InputRow next;
+
+ public FilteringCloseableInputRowIterator(
+ CloseableIterator<InputRow> delegate,
+ Predicate<InputRow> filter,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
+ )
+ {
+ this.delegate = delegate;
+ this.filter = filter;
+ this.rowIngestionMeters = rowIngestionMeters;
+ this.parseExceptionHandler = parseExceptionHandler;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (next == null && delegate.hasNext()) {
+ try {
+ // delegate.next() can throw ParseException
+ final InputRow row = delegate.next();
+ // filter.test() can throw ParseException
+ if (filter.test(row)) {
+ next = row;
+ } else {
+ rowIngestionMeters.incrementThrownAway();
+ }
+ }
+ catch (ParseException e) {
+ parseExceptionHandler.handle(e);
+ }
+ }
+ return next != null;
+ }
+
+ @Override
+ public InputRow next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ final InputRow row = next;
+ next = null;
+ return row;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 576b7de..e24df91 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -49,12 +49,12 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index e70a8ca..5e93675 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -37,9 +37,7 @@
import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.Rows;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.hll.HyperLogLogCollector;
@@ -58,7 +56,6 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
@@ -69,7 +66,6 @@
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
@@ -77,9 +73,9 @@
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
@@ -102,7 +98,6 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.utils.CircularBuffer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -120,7 +115,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -133,7 +127,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
-import java.util.stream.Collectors;
+import java.util.function.Predicate;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
{
@@ -161,9 +155,11 @@
private IngestionState ingestionState;
- private final CircularBuffer<Throwable> buildSegmentsSavedParseExceptions;
+ @MonotonicNonNull
+ private ParseExceptionHandler determinePartitionsParseExceptionHandler;
- private final CircularBuffer<Throwable> determinePartitionsSavedParseExceptions;
+ @MonotonicNonNull
+ private ParseExceptionHandler buildSegmentsParseExceptionHandler;
@MonotonicNonNull
private AuthorizerMapper authorizerMapper;
@@ -213,18 +209,6 @@
context
);
this.ingestionSchema = ingestionSchema;
- if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) {
- determinePartitionsSavedParseExceptions = new CircularBuffer<>(
- ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
- );
-
- buildSegmentsSavedParseExceptions = new CircularBuffer<>(
- ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
- );
- } else {
- determinePartitionsSavedParseExceptions = null;
- buildSegmentsSavedParseExceptions = null;
- }
this.ingestionState = IngestionState.NOT_STARTED;
}
@@ -318,14 +302,18 @@
if (needsDeterminePartitions) {
events.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
- IndexTaskUtils.getMessagesFromSavedParseExceptions(determinePartitionsSavedParseExceptions)
+ IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ determinePartitionsParseExceptionHandler.getSavedParseExceptions()
+ )
);
}
if (needsBuildSegments) {
events.put(
RowIngestionMeters.BUILD_SEGMENTS,
- IndexTaskUtils.getMessagesFromSavedParseExceptions(buildSegmentsSavedParseExceptions)
+ IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ buildSegmentsParseExceptionHandler.getSavedParseExceptions()
+ )
);
}
@@ -448,6 +436,18 @@
this.authorizerMapper = toolbox.getAuthorizerMapper();
this.determinePartitionsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
this.buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ this.determinePartitionsParseExceptionHandler = new ParseExceptionHandler(
+ determinePartitionsMeters,
+ ingestionSchema.getTuningConfig().isLogParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
+ );
+ this.buildSegmentsParseExceptionHandler = new ParseExceptionHandler(
+ buildSegmentsMeters,
+ ingestionSchema.getTuningConfig().isLogParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
+ );
final boolean determineIntervals = !ingestionSchema.getDataSchema()
.getGranularitySpec()
@@ -530,9 +530,11 @@
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<String> determinePartitionsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
- determinePartitionsSavedParseExceptions);
+ determinePartitionsParseExceptionHandler.getSavedParseExceptions()
+ );
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
- buildSegmentsSavedParseExceptions);
+ buildSegmentsParseExceptionHandler.getSavedParseExceptions()
+ );
if (determinePartitionsParseExceptionMessages != null || buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsParseExceptionMessages);
@@ -709,82 +711,54 @@
Comparators.intervalsByStartThenEnd()
);
final Granularity queryGranularity = granularitySpec.getQueryGranularity();
- final List<String> metricsNames = Arrays.stream(ingestionSchema.getDataSchema().getAggregators())
- .map(AggregatorFactory::getName)
- .collect(Collectors.toList());
- final InputSourceReader inputSourceReader = ingestionSchema.getDataSchema().getTransformSpec().decorate(
- inputSource.reader(
- new InputRowSchema(
- ingestionSchema.getDataSchema().getTimestampSpec(),
- ingestionSchema.getDataSchema().getDimensionsSpec(),
- metricsNames
- ),
- inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
- tmpDir
- )
- );
+ final Predicate<InputRow> rowFilter = inputRow -> {
+ if (inputRow == null) {
+ return false;
+ }
+ if (determineIntervals) {
+ return true;
+ }
+ final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+ return optInterval.isPresent();
+ };
- try (final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read()) {
+ try (final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+ tmpDir,
+ ingestionSchema.getDataSchema(),
+ inputSource,
+ inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
+ rowFilter,
+ determinePartitionsMeters,
+ determinePartitionsParseExceptionHandler
+ )) {
while (inputRowIterator.hasNext()) {
- try {
- final InputRow inputRow = inputRowIterator.next();
+ final InputRow inputRow = inputRowIterator.next();
- // The null inputRow means the caller must skip this row.
- if (inputRow == null) {
- determinePartitionsMeters.incrementThrownAway();
- continue;
- }
-
- final Interval interval;
- if (determineIntervals) {
- interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
- } else {
- if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
- final String errorMsg = StringUtils.format(
- "Encountered row with timestamp that cannot be represented as a long: [%s]",
- inputRow
- );
- throw new ParseException(errorMsg);
- }
-
- final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
- if (!optInterval.isPresent()) {
- determinePartitionsMeters.incrementThrownAway();
- continue;
- }
- interval = optInterval.get();
- }
-
- if (partitionsSpec.needsDeterminePartitions(false)) {
- hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
-
- List<Object> groupKey = Rows.toGroupKey(
- queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
- inputRow
- );
- hllCollectors.get(interval).get()
- .add(HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
- } else {
- // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
- // for the interval and don't instantiate a HLL collector
- hllCollectors.putIfAbsent(interval, Optional.absent());
- }
- determinePartitionsMeters.incrementProcessed();
+ final Interval interval;
+ if (determineIntervals) {
+ interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+ } else {
+ final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+ // this interval must exist since it passed the rowFilter
+ assert optInterval.isPresent();
+ interval = optInterval.get();
}
- catch (ParseException e) {
- if (ingestionSchema.getTuningConfig().isLogParseExceptions()) {
- log.error(e, "Encountered parse exception");
- }
- if (determinePartitionsSavedParseExceptions != null) {
- determinePartitionsSavedParseExceptions.add(e);
- }
+ if (partitionsSpec.needsDeterminePartitions(false)) {
+ hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
- determinePartitionsMeters.incrementUnparseable();
- if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig().getMaxParseExceptions()) {
- throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
- }
+ List<Object> groupKey = Rows.toGroupKey(
+ queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
+ inputRow
+ );
+ hllCollectors.get(interval).get()
+ .add(HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
+ } else {
+ // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
+ // for the interval and don't instantiate a HLL collector
+ hllCollectors.putIfAbsent(interval, Optional.absent());
}
+ determinePartitionsMeters.incrementProcessed();
}
}
@@ -889,28 +863,26 @@
buildSegmentsFireDepartmentMetrics,
toolbox,
dataSchema,
- tuningConfig
+ tuningConfig,
+ buildSegmentsMeters,
+ buildSegmentsParseExceptionHandler
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
driver.startJob();
- final InputSourceProcessor inputSourceProcessor = new InputSourceProcessor(
- buildSegmentsMeters,
- buildSegmentsSavedParseExceptions,
- tuningConfig.isLogParseExceptions(),
- tuningConfig.getMaxParseExceptions(),
- pushTimeout,
- new DefaultIndexTaskInputRowIteratorBuilder()
- );
- inputSourceProcessor.process(
+ InputSourceProcessor.process(
dataSchema,
driver,
partitionsSpec,
inputSource,
inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
tmpDir,
- sequenceNameFunction
+ sequenceNameFunction,
+ new DefaultIndexTaskInputRowIteratorBuilder(),
+ buildSegmentsMeters,
+ buildSegmentsParseExceptionHandler,
+ pushTimeout
);
// If we use timeChunk lock, then we don't have to specify what segments will be overwritten because
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 740ca20..abc6b92 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.security.Access;
@@ -40,7 +41,7 @@
public class IndexTaskUtils
{
@Nullable
- public static List<String> getMessagesFromSavedParseExceptions(CircularBuffer<Throwable> savedParseExceptions)
+ public static List<String> getMessagesFromSavedParseExceptions(CircularBuffer<ParseException> savedParseExceptions)
{
if (savedParseExceptions == null) {
return null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
index e88dab2..05ac79e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
@@ -23,64 +23,33 @@
import org.apache.druid.data.input.HandlingInputRowIterator;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.utils.CircularBuffer;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
public class InputSourceProcessor
{
private static final Logger LOG = new Logger(InputSourceProcessor.class);
- private final RowIngestionMeters buildSegmentsMeters;
- @Nullable
- private final CircularBuffer<Throwable> buildSegmentsSavedParseExceptions;
- private final boolean logParseExceptions;
- private final int maxParseExceptions;
- private final long pushTimeout;
- private final IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder;
-
- public InputSourceProcessor(
- RowIngestionMeters buildSegmentsMeters,
- @Nullable CircularBuffer<Throwable> buildSegmentsSavedParseExceptions,
- boolean logParseExceptions,
- int maxParseExceptions,
- long pushTimeout,
- IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder
- )
- {
- this.buildSegmentsMeters = buildSegmentsMeters;
- this.buildSegmentsSavedParseExceptions = buildSegmentsSavedParseExceptions;
- this.logParseExceptions = logParseExceptions;
- this.maxParseExceptions = maxParseExceptions;
- this.pushTimeout = pushTimeout;
- this.inputRowIteratorBuilder = inputRowIteratorBuilder;
- }
-
/**
* This method opens the given {@link InputSource} and processes data via {@link InputSourceReader}.
* All read data is consumed by {@link BatchAppenderatorDriver} which creates new segments.
@@ -88,14 +57,18 @@
*
* @return {@link SegmentsAndCommitMetadata} for the pushed segments.
*/
- public SegmentsAndCommitMetadata process(
+ public static SegmentsAndCommitMetadata process(
DataSchema dataSchema,
BatchAppenderatorDriver driver,
PartitionsSpec partitionsSpec,
InputSource inputSource,
@Nullable InputFormat inputFormat,
File tmpDir,
- SequenceNameFunction sequenceNameFunction
+ SequenceNameFunction sequenceNameFunction,
+ IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder,
+ RowIngestionMeters buildSegmentsMeters,
+ ParseExceptionHandler parseExceptionHandler,
+ long pushTimeout
) throws IOException, InterruptedException, ExecutionException, TimeoutException
{
@Nullable
@@ -103,73 +76,53 @@
? (DynamicPartitionsSpec) partitionsSpec
: null;
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
-
- final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
- .map(AggregatorFactory::getName)
- .collect(Collectors.toList());
- final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
- inputSource.reader(
- new InputRowSchema(
- dataSchema.getTimestampSpec(),
- dataSchema.getDimensionsSpec(),
- metricsNames
- ),
- inputFormat,
- tmpDir
- )
- );
try (
- final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read();
- HandlingInputRowIterator iterator = inputRowIteratorBuilder
+ final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+ tmpDir,
+ dataSchema,
+ inputSource,
+ inputFormat,
+ AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+ buildSegmentsMeters,
+ parseExceptionHandler
+ );
+ final HandlingInputRowIterator iterator = inputRowIteratorBuilder
.delegate(inputRowIterator)
.granularitySpec(granularitySpec)
- .nullRowRunnable(buildSegmentsMeters::incrementThrownAway)
- .absentBucketIntervalConsumer(inputRow -> buildSegmentsMeters.incrementThrownAway())
.build()
) {
while (iterator.hasNext()) {
- try {
- final InputRow inputRow = iterator.next();
- if (inputRow == null) {
- continue;
- }
-
- // IndexTaskInputRowIteratorBuilder.absentBucketIntervalConsumer() ensures the interval will be present here
- Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
- @SuppressWarnings("OptionalGetWithoutIsPresent")
- final Interval interval = optInterval.get();
-
- final String sequenceName = sequenceNameFunction.getSequenceName(interval, inputRow);
- final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
-
- if (addResult.isOk()) {
- // incremental segment publishment is allowed only when rollup doesn't have to be perfect.
- if (dynamicPartitionsSpec != null) {
- final boolean isPushRequired = addResult.isPushRequired(
- dynamicPartitionsSpec.getMaxRowsPerSegment(),
- dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
- );
- if (isPushRequired) {
- // There can be some segments waiting for being pushed even though no more rows will be added to them
- // in the future.
- // If those segments are not pushed here, the remaining available space in appenderator will be kept
- // small which could lead to smaller segments.
- final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
- LOG.debugSegments(pushed.getSegments(), "Pushed segments");
- }
- }
- } else {
- throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
- }
-
- if (addResult.getParseException() != null) {
- handleParseException(addResult.getParseException());
- } else {
- buildSegmentsMeters.incrementProcessed();
- }
+ final InputRow inputRow = iterator.next();
+ if (inputRow == null) {
+ continue;
}
- catch (ParseException e) {
- handleParseException(e);
+
+ // IndexTaskInputRowIteratorBuilder.absentBucketIntervalConsumer() ensures the interval will be present here
+ Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ final Interval interval = optInterval.get();
+
+ final String sequenceName = sequenceNameFunction.getSequenceName(interval, inputRow);
+ final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
+
+ if (addResult.isOk()) {
+ // incremental segment publishment is allowed only when rollup doesn't have to be perfect.
+ if (dynamicPartitionsSpec != null) {
+ final boolean isPushRequired = addResult.isPushRequired(
+ dynamicPartitionsSpec.getMaxRowsPerSegment(),
+ dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
+ );
+ if (isPushRequired) {
+ // There can be some segments waiting for being pushed even though no more rows will be added to them
+ // in the future.
+ // If those segments are not pushed here, the remaining available space in appenderator will be kept
+ // small which could lead to smaller segments.
+ final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
+ LOG.debugSegments(pushed.getSegments(), "Pushed segments");
+ }
+ }
+ } else {
+ throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
}
}
@@ -179,26 +132,4 @@
return pushed;
}
}
-
- private void handleParseException(ParseException e)
- {
- if (e.isFromPartiallyValidRow()) {
- buildSegmentsMeters.incrementProcessedWithError();
- } else {
- buildSegmentsMeters.incrementUnparseable();
- }
-
- if (logParseExceptions) {
- LOG.error(e, "Encountered parse exception");
- }
-
- if (buildSegmentsSavedParseExceptions != null) {
- buildSegmentsSavedParseExceptions.add(e);
- }
-
- if (buildSegmentsMeters.getUnparseable() + buildSegmentsMeters.getProcessedWithError() > maxParseExceptions) {
- LOG.error("Max parse exceptions exceeded, terminating task...");
- throw new RuntimeException("Max parse exceptions exceeded, terminating task...", e);
- }
- }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index d71102b..53019dd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -29,37 +29,33 @@
import org.apache.druid.data.input.HandlingInputRowIterator;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.Rows;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
-import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
/**
* The worker task of {@link PartialDimensionDistributionParallelIndexTaskRunner}. This task
@@ -189,41 +185,38 @@
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
- List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
- .map(AggregatorFactory::getName)
- .collect(Collectors.toList());
InputFormat inputFormat = inputSource.needsFormat()
? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
: null;
- InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
- inputSource.reader(
- new InputRowSchema(
- dataSchema.getTimestampSpec(),
- dataSchema.getDimensionsSpec(),
- metricsNames
- ),
- inputFormat,
- toolbox.getIndexingTmpDir()
- )
+ final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ buildSegmentsMeters,
+ tuningConfig.isLogParseExceptions(),
+ tuningConfig.getMaxParseExceptions(),
+ tuningConfig.getMaxSavedParseExceptions()
);
try (
- CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read();
+ final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+ toolbox.getIndexingTmpDir(),
+ dataSchema,
+ inputSource,
+ inputFormat,
+ AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+ buildSegmentsMeters,
+ parseExceptionHandler
+ );
HandlingInputRowIterator iterator =
new RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimension, SKIP_NULL)
.delegate(inputRowIterator)
.granularitySpec(granularitySpec)
- .nullRowRunnable(IndexTaskInputRowIteratorBuilder.NOOP_RUNNABLE)
- .absentBucketIntervalConsumer(IndexTaskInputRowIteratorBuilder.NOOP_CONSUMER)
.build()
) {
Map<Interval, StringDistribution> distribution = determineDistribution(
iterator,
granularitySpec,
partitionDimension,
- isAssumeGrouped,
- tuningConfig.isLogParseExceptions(),
- tuningConfig.getMaxParseExceptions()
+ isAssumeGrouped
);
sendReport(toolbox, new DimensionDistributionReport(getId(), distribution));
}
@@ -235,9 +228,7 @@
HandlingInputRowIterator inputRowIterator,
GranularitySpec granularitySpec,
String partitionDimension,
- boolean isAssumeGrouped,
- boolean isLogParseExceptions,
- int maxParseExceptions
+ boolean isAssumeGrouped
)
{
Map<Interval, StringDistribution> intervalToDistribution = new HashMap<>();
@@ -246,36 +237,22 @@
? dedupInputRowFilterSupplier.get()
: new PassthroughInputRowFilter();
- int numParseExceptions = 0;
-
while (inputRowIterator.hasNext()) {
- try {
- InputRow inputRow = inputRowIterator.next();
- if (inputRow == null) {
- continue;
- }
-
- DateTime timestamp = inputRow.getTimestamp();
-
- //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
- Interval interval = granularitySpec.bucketInterval(timestamp).get();
- String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
-
- if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
- StringDistribution stringDistribution =
- intervalToDistribution.computeIfAbsent(interval, k -> new StringSketch());
- stringDistribution.put(partitionDimensionValue);
- }
+ InputRow inputRow = inputRowIterator.next();
+ if (inputRow == null) {
+ continue;
}
- catch (ParseException e) {
- if (isLogParseExceptions) {
- LOG.error(e, "Encountered parse exception");
- }
- numParseExceptions++;
- if (numParseExceptions > maxParseExceptions) {
- throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
- }
+ DateTime timestamp = inputRow.getTimestamp();
+
+ //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+ Interval interval = granularitySpec.bucketInterval(timestamp).get();
+ String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
+
+ if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
+ StringDistribution stringDistribution =
+ intervalToDistribution.computeIfAbsent(interval, k -> new StringSketch());
+ stringDistribution.put(partitionDimensionValue);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index b65d4f2..27160c9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -23,7 +23,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.InputSourceProcessor;
@@ -33,6 +32,8 @@
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
@@ -157,6 +158,12 @@
final SegmentAllocatorForBatch segmentAllocator = createSegmentAllocator(toolbox, taskClient);
final SequenceNameFunction sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ buildSegmentsMeters,
+ tuningConfig.isLogParseExceptions(),
+ tuningConfig.getMaxParseExceptions(),
+ tuningConfig.getMaxSavedParseExceptions()
+ );
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
@@ -164,28 +171,26 @@
toolbox,
dataSchema,
tuningConfig,
- new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager())
+ new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()),
+ buildSegmentsMeters,
+ parseExceptionHandler
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
driver.startJob();
- final InputSourceProcessor inputSourceProcessor = new InputSourceProcessor(
- buildSegmentsMeters,
- null,
- tuningConfig.isLogParseExceptions(),
- tuningConfig.getMaxParseExceptions(),
- pushTimeout,
- inputRowIteratorBuilder
- );
- final SegmentsAndCommitMetadata pushed = inputSourceProcessor.process(
+ final SegmentsAndCommitMetadata pushed = InputSourceProcessor.process(
dataSchema,
driver,
partitionsSpec,
inputSource,
inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
tmpDir,
- sequenceNameFunction
+ sequenceNameFunction,
+ inputRowIteratorBuilder,
+ buildSegmentsMeters,
+ parseExceptionHandler,
+ pushTimeout
);
return pushed.getSegments();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index ba1316f..543de61 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -24,9 +24,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -39,14 +37,12 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
@@ -69,7 +65,6 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -77,7 +72,6 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
/**
* The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task
@@ -304,95 +298,81 @@
partitionsSpec
);
+ final RowIngestionMeters rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ tuningConfig.isLogParseExceptions(),
+ tuningConfig.getMaxParseExceptions(),
+ tuningConfig.getMaxSavedParseExceptions()
+ );
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
fireDepartmentMetrics,
toolbox,
dataSchema,
- tuningConfig
- );
- final List<String> metricsNames = Arrays.stream(ingestionSchema.getDataSchema().getAggregators())
- .map(AggregatorFactory::getName)
- .collect(Collectors.toList());
- final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
- inputSource.reader(
- new InputRowSchema(
- ingestionSchema.getDataSchema().getTimestampSpec(),
- ingestionSchema.getDataSchema().getDimensionsSpec(),
- metricsNames
- ),
- inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
- tmpDir
+ tuningConfig,
+ rowIngestionMeters,
+ new ParseExceptionHandler(
+ rowIngestionMeters,
+ tuningConfig.isLogParseExceptions(),
+ tuningConfig.getMaxParseExceptions(),
+ tuningConfig.getMaxSavedParseExceptions()
)
);
-
boolean exceptionOccurred = false;
try (
final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator);
- final CloseableIterator<InputRow> inputRowIterator = inputSourceReader.read()
+ final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+ tmpDir,
+ dataSchema,
+ inputSource,
+ inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
+ inputRow -> {
+ if (inputRow == null) {
+ return false;
+ }
+ if (explicitIntervals) {
+ final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
+ return optInterval.isPresent();
+ }
+ return true;
+ },
+ rowIngestionMeters,
+ parseExceptionHandler
+ )
) {
driver.startJob();
final Set<DataSegment> pushedSegments = new HashSet<>();
while (inputRowIterator.hasNext()) {
- try {
- final InputRow inputRow = inputRowIterator.next();
+ final InputRow inputRow = inputRowIterator.next();
- if (inputRow == null) {
- fireDepartmentMetrics.incrementThrownAway();
- continue;
+ // Segments are created as needed, using a single sequence name. They may be allocated from the overlord
+ // (in append mode) or may be created on our own authority (in overwrite mode).
+ final String sequenceName = getId();
+ final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
+
+ if (addResult.isOk()) {
+ final boolean isPushRequired = addResult.isPushRequired(
+ partitionsSpec.getMaxRowsPerSegment(),
+ partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
+ );
+ if (isPushRequired) {
+ // There can be some segments waiting for being published even though any rows won't be added to them.
+ // If those segments are not published here, the available space in appenderator will be kept to be small
+ // which makes the size of segments smaller.
+ final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
+ pushedSegments.addAll(pushed.getSegments());
+ LOG.info("Pushed [%s] segments", pushed.getSegments().size());
+ LOG.infoSegments(pushed.getSegments(), "Pushed segments");
}
-
- if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
- final String errorMsg = StringUtils.format(
- "Encountered row with timestamp that cannot be represented as a long: [%s]",
- inputRow
- );
- throw new ParseException(errorMsg);
- }
-
- if (explicitIntervals) {
- final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
- if (!optInterval.isPresent()) {
- fireDepartmentMetrics.incrementThrownAway();
- continue;
- }
- }
-
- // Segments are created as needed, using a single sequence name. They may be allocated from the overlord
- // (in append mode) or may be created on our own authority (in overwrite mode).
- final String sequenceName = getId();
- final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
-
- if (addResult.isOk()) {
- final boolean isPushRequired = addResult.isPushRequired(
- partitionsSpec.getMaxRowsPerSegment(),
- partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
- );
- if (isPushRequired) {
- // There can be some segments waiting for being published even though any rows won't be added to them.
- // If those segments are not published here, the available space in appenderator will be kept to be small
- // which makes the size of segments smaller.
- final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
- pushedSegments.addAll(pushed.getSegments());
- LOG.info("Pushed [%s] segments", pushed.getSegments().size());
- LOG.infoSegments(pushed.getSegments(), "Pushed segments");
- }
- } else {
- throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
- }
-
- fireDepartmentMetrics.incrementProcessed();
+ } else {
+ throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
}
- catch (ParseException e) {
- if (tuningConfig.isReportParseExceptions()) {
- throw e;
- } else {
- fireDepartmentMetrics.incrementUnparseable();
- }
- }
+
+ fireDepartmentMetrics.incrementProcessed();
}
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
index b2a9463..aa74f11 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilder.java
@@ -19,38 +19,22 @@
package org.apache.druid.indexing.common.task.batch.parallel.iterator;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.HandlingInputRowIterator;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.common.task.IndexTask;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.Consumer;
/**
* <pre>
* Build a default {@link HandlingInputRowIterator} for {@link IndexTask}s. Each {@link InputRow} is
- * processed by the following handlers, in order:
- *
- * 1. Null row: If {@link InputRow} is null, invoke the null row {@link Runnable} callback.
- *
- * 2. Invalid timestamp: If {@link InputRow} has an invalid timestamp, throw a {@link ParseException}.
- *
- * 3. Absent bucket interval: If {@link InputRow} has a timestamp that does not match the
- * {@link GranularitySpec} bucket intervals, invoke the absent bucket interval {@link Consumer}
- * callback.
- *
- * 4. Any additional handlers in the order they are added by calls to
- * {@link #appendInputRowHandler(HandlingInputRowIterator.InputRowHandler)}.
+ * processed by the registered handlers in the order that they are registered by calls to
+ * {@link #appendInputRowHandler(HandlingInputRowIterator.InputRowHandler)}.
*
* If any of the handlers invoke their respective callback, the {@link HandlingInputRowIterator} will yield
* a null {@link InputRow} next; otherwise, the next {@link InputRow} is yielded.
@@ -62,8 +46,6 @@
{
private CloseableIterator<InputRow> delegate = null;
private GranularitySpec granularitySpec = null;
- private HandlingInputRowIterator.InputRowHandler nullRowHandler = null;
- private HandlingInputRowIterator.InputRowHandler absentBucketIntervalHandler = null;
private final List<HandlingInputRowIterator.InputRowHandler> appendedInputRowHandlers = new ArrayList<>();
@Override
@@ -81,46 +63,12 @@
}
@Override
- public DefaultIndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable)
- {
- this.nullRowHandler = inputRow -> {
- if (inputRow == null) {
- nullRowRunnable.run();
- return true;
- }
- return false;
- };
- return this;
- }
-
- @Override
- public DefaultIndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(
- Consumer<InputRow> absentBucketIntervalConsumer
- )
- {
- this.absentBucketIntervalHandler = inputRow -> {
- Optional<Interval> intervalOpt = granularitySpec.bucketInterval(inputRow.getTimestamp());
- if (!intervalOpt.isPresent()) {
- absentBucketIntervalConsumer.accept(inputRow);
- return true;
- }
- return false;
- };
- return this;
- }
-
- @Override
public HandlingInputRowIterator build()
{
Preconditions.checkNotNull(delegate, "delegate required");
Preconditions.checkNotNull(granularitySpec, "granularitySpec required");
- Preconditions.checkNotNull(nullRowHandler, "nullRowRunnable required");
- Preconditions.checkNotNull(absentBucketIntervalHandler, "absentBucketIntervalConsumer required");
ImmutableList.Builder<HandlingInputRowIterator.InputRowHandler> handlersBuilder = ImmutableList.<HandlingInputRowIterator.InputRowHandler>builder()
- .add(nullRowHandler)
- .add(createInvalidTimestampHandler())
- .add(absentBucketIntervalHandler)
.addAll(appendedInputRowHandlers);
return new HandlingInputRowIterator(delegate, handlersBuilder.build());
@@ -134,18 +82,4 @@
this.appendedInputRowHandlers.add(inputRowHandler);
return this;
}
-
- private HandlingInputRowIterator.InputRowHandler createInvalidTimestampHandler()
- {
- return inputRow -> {
- if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
- String errorMsg = StringUtils.format(
- "Encountered row with timestamp that cannot be represented as a long: [%s]",
- inputRow
- );
- throw new ParseException(errorMsg);
- }
- return false;
- };
- }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
index 80b4ea8..89afdbb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
@@ -25,16 +25,8 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import java.util.function.Consumer;
-
public interface IndexTaskInputRowIteratorBuilder
{
- Runnable NOOP_RUNNABLE = () -> {
- };
-
- Consumer<InputRow> NOOP_CONSUMER = inputRow -> {
- };
-
/**
* @param inputRowIterator Source of {@link InputRow}s.
*/
@@ -46,16 +38,5 @@
*/
IndexTaskInputRowIteratorBuilder granularitySpec(GranularitySpec granularitySpec);
- /**
- * @param nullRowRunnable Runnable for when {@link Firehose} yields a null row.
- */
- IndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable);
-
- /**
- * @param absentBucketIntervalConsumer Consumer for when {@link Firehose} yields a row with a timestamp that does not
- * match the {@link GranularitySpec} bucket intervals.
- */
- IndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(Consumer<InputRow> absentBucketIntervalConsumer);
-
HandlingInputRowIterator build();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
index 171d6b8..30d34bb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
@@ -24,26 +24,16 @@
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import java.util.List;
-import java.util.function.Consumer;
/**
* <pre>
* Build an {@link HandlingInputRowIterator} for {@link IndexTask}s used for range partitioning. Each {@link
* InputRow} is processed by the following handlers, in order:
*
- * 1. Null row: If {@link InputRow} is null, invoke the null row {@link Runnable} callback.
- *
- * 2. Invalid timestamp: If {@link InputRow} has an invalid timestamp, throw a {@link ParseException}.
- *
- * 3. Absent bucket interval: If {@link InputRow} has a timestamp that does not match the
- * {@link GranularitySpec} bucket intervals, invoke the absent bucket interval {@link Consumer}
- * callback.
- *
- * 4. Filter for rows with only a single dimension value count for the specified partition dimension.
+ * 1. Filter for rows with only a single dimension value count for the specified partition dimension.
*
* If any of the handlers invoke their respective callback, the {@link HandlingInputRowIterator} will yield
* a null {@link InputRow} next; otherwise, the next {@link InputRow} is yielded.
@@ -83,18 +73,6 @@
}
@Override
- public IndexTaskInputRowIteratorBuilder nullRowRunnable(Runnable nullRowRunnable)
- {
- return delegate.nullRowRunnable(nullRowRunnable);
- }
-
- @Override
- public IndexTaskInputRowIteratorBuilder absentBucketIntervalConsumer(Consumer<InputRow> absentBucketIntervalConsumer)
- {
- return delegate.absentBucketIntervalConsumer(absentBucketIntervalConsumer);
- }
-
- @Override
public HandlingInputRowIterator build()
{
return delegate.build();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 130f041..b0f4702 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -34,7 +34,6 @@
import org.apache.druid.indexing.overlord.sampler.SamplerResponse.SamplerResponseRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
@@ -43,7 +42,6 @@
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
-import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.indexing.DataSchema;
@@ -134,18 +132,11 @@
}
for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
- if (!Intervals.ETERNITY.contains(row.getTimestamp())) {
- throw new ParseException("Timestamp cannot be represented as a long: [%s]", row);
- }
- IncrementalIndexAddResult result = index.add(new SamplerInputRow(row, counter), true);
- if (result.getParseException() != null) {
- throw result.getParseException();
- } else {
- // store the raw value; will be merged with the data from the IncrementalIndex later
- responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
- counter++;
- numRowsIndexed++;
- }
+ index.add(new SamplerInputRow(row, counter), true);
+ // store the raw value; will be merged with the data from the IncrementalIndex later
+ responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
+ counter++;
+ numRowsIndexed++;
}
}
catch (ParseException e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 4fdb33d..54292d8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -37,13 +37,13 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
@@ -51,7 +51,6 @@
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
-import org.apache.druid.utils.CircularBuffer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable;
@@ -68,7 +67,6 @@
protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
protected final Map<String, Object> context;
- protected final CircularBuffer<Throwable> savedParseExceptions;
protected final LockGranularity lockGranularityToUse;
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
@@ -99,11 +97,6 @@
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
- if (tuningConfig.getMaxSavedParseExceptions() > 0) {
- savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
- } else {
- savedParseExceptions = null;
- }
this.context = context;
this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
@@ -179,7 +172,12 @@
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}
- public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
+ public Appenderator newAppenderator(
+ TaskToolbox toolbox,
+ FireDepartmentMetrics metrics,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
+ )
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
getId(),
@@ -197,7 +195,9 @@
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
- toolbox.getCachePopulatorStats()
+ toolbox.getCachePopulatorStats(),
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
@@ -240,14 +240,6 @@
final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent()
&& ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());
- if (!Intervals.ETERNITY.contains(row.getTimestamp())) {
- final String errorMsg = StringUtils.format(
- "Encountered row with timestamp that cannot be represented as a long: [%s]",
- row
- );
- throw new ParseException(errorMsg);
- }
-
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c62584f..2a2bf2d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -58,7 +58,6 @@
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -69,10 +68,10 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -85,7 +84,6 @@
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
@@ -203,7 +201,6 @@
private final InputFormat inputFormat;
@Nullable
private final InputRowParser<ByteBuffer> parser;
- private final CircularBuffer<Throwable> savedParseExceptions;
private final String stream;
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
@@ -214,6 +211,8 @@
@MonotonicNonNull
private RowIngestionMeters rowIngestionMeters;
+ @MonotonicNonNull
+ private ParseExceptionHandler parseExceptionHandler;
@MonotonicNonNull
private AuthorizerMapper authorizerMapper;
@@ -236,7 +235,6 @@
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task,
@Nullable final InputRowParser<ByteBuffer> parser,
final AuthorizerMapper authorizerMapper,
- final CircularBuffer<Throwable> savedParseExceptions,
final LockGranularity lockGranularityToUse
)
{
@@ -254,7 +252,6 @@
this.inputFormat = ioConfig.getInputFormat();
this.parser = parser;
this.authorizerMapper = authorizerMapper;
- this.savedParseExceptions = savedParseExceptions;
this.stream = ioConfig.getStartSequenceNumbers().getStream();
this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
this.sequences = new CopyOnWriteArrayList<>();
@@ -363,20 +360,29 @@
setToolbox(toolbox);
+ authorizerMapper = toolbox.getAuthorizerMapper();
+ rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ tuningConfig.isLogParseExceptions(),
+ tuningConfig.getMaxParseExceptions(),
+ tuningConfig.getMaxSavedParseExceptions()
+ );
+
// Now we can initialize StreamChunkReader with the given toolbox.
final StreamChunkParser parser = new StreamChunkParser(
this.parser,
inputFormat,
inputRowSchema,
task.getDataSchema().getTransformSpec(),
- toolbox.getIndexingTmpDir()
+ toolbox.getIndexingTmpDir(),
+ row -> row != null && task.withinMinMaxRecordTime(row),
+ rowIngestionMeters,
+ parseExceptionHandler
);
initializeSequences();
- authorizerMapper = toolbox.getAuthorizerMapper();
- rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
-
log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
toolbox.getChatHandlerProvider().register(task.getId(), this, false);
@@ -412,7 +418,7 @@
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
}
- appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox);
+ appenderator = task.newAppenderator(toolbox, fireDepartmentMetrics, rowIngestionMeters, parseExceptionHandler);
driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics);
// Start up, set up initial sequences.
@@ -615,94 +621,74 @@
);
if (shouldProcess) {
- try {
- final List<byte[]> valueBytess = record.getData();
- final List<InputRow> rows;
- if (valueBytess == null || valueBytess.isEmpty()) {
- rows = Utils.nullableListOf((InputRow) null);
- } else {
- rows = parser.parse(valueBytess);
- }
- boolean isPersistRequired = false;
+ final List<byte[]> valueBytess = record.getData();
+ final List<InputRow> rows = parser.parse(valueBytess);
+ boolean isPersistRequired = false;
- final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
- .stream()
- .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
- .findFirst()
- .orElse(null);
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
+ .stream()
+ .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
+ .findFirst()
+ .orElse(null);
- if (sequenceToUse == null) {
- throw new ISE(
- "Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s",
- record.getPartitionId(),
- record.getSequenceNumber(),
- sequences
+ if (sequenceToUse == null) {
+ throw new ISE(
+ "Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s",
+ record.getPartitionId(),
+ record.getSequenceNumber(),
+ sequences
+ );
+ }
+
+ for (InputRow row : rows) {
+ final AppenderatorDriverAddResult addResult = driver.add(
+ row,
+ sequenceToUse.getSequenceName(),
+ committerSupplier,
+ true,
+ // do not allow incremental persists to happen until all the rows from this batch
+ // of rows are indexed
+ false
+ );
+
+ if (addResult.isOk()) {
+ // If the number of rows in the segment exceeds the threshold after adding a row,
+ // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
+ final boolean isPushRequired = addResult.isPushRequired(
+ tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
+ tuningConfig.getPartitionsSpec()
+ .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
- }
-
- for (InputRow row : rows) {
- if (row != null && task.withinMinMaxRecordTime(row)) {
- final AppenderatorDriverAddResult addResult = driver.add(
- row,
- sequenceToUse.getSequenceName(),
- committerSupplier,
- true,
- // do not allow incremental persists to happen until all the rows from this batch
- // of rows are indexed
- false
- );
-
- if (addResult.isOk()) {
- // If the number of rows in the segment exceeds the threshold after adding a row,
- // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
- final boolean isPushRequired = addResult.isPushRequired(
- tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
- tuningConfig.getPartitionsSpec()
- .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
- );
- if (isPushRequired && !sequenceToUse.isCheckpointed()) {
- sequenceToCheckpoint = sequenceToUse;
- }
- isPersistRequired |= addResult.isPersistRequired();
- } else {
- // Failure to allocate segment puts determinism at risk, bail out to be safe.
- // May want configurable behavior here at some point.
- // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
- throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
- }
-
- if (addResult.getParseException() != null) {
- handleParseException(addResult.getParseException(), record);
- } else {
- rowIngestionMeters.incrementProcessed();
- }
- } else {
- rowIngestionMeters.incrementThrownAway();
+ if (isPushRequired && !sequenceToUse.isCheckpointed()) {
+ sequenceToCheckpoint = sequenceToUse;
}
- }
- if (isPersistRequired) {
- Futures.addCallback(
- driver.persistAsync(committerSupplier.get()),
- new FutureCallback<Object>()
- {
- @Override
- public void onSuccess(@Nullable Object result)
- {
- log.debug("Persist completed with metadata: %s", result);
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- log.error("Persist failed, dying");
- backgroundThreadException = t;
- }
- }
- );
+ isPersistRequired |= addResult.isPersistRequired();
+ } else {
+ // Failure to allocate segment puts determinism at risk, bail out to be safe.
+ // May want configurable behavior here at some point.
+ // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
+ throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
}
- catch (ParseException e) {
- handleParseException(e, record);
+ if (isPersistRequired) {
+ Futures.addCallback(
+ driver.persistAsync(committerSupplier.get()),
+ new FutureCallback<Object>()
+ {
+ @Override
+ public void onSuccess(@Nullable Object result)
+ {
+ log.debug("Persist completed with metadata: %s", result);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ log.error("Persist failed, dying");
+ backgroundThreadException = t;
+ }
+ }
+ );
}
// in kafka, we can easily get the next offset by adding 1, but for kinesis, there's no way
@@ -1082,7 +1068,7 @@
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
- savedParseExceptions
+ parseExceptionHandler.getSavedParseExceptions()
);
if (buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
@@ -1297,34 +1283,6 @@
return false;
}
-
- private void handleParseException(ParseException e, OrderedPartitionableRecord record)
- {
- if (e.isFromPartiallyValidRow()) {
- rowIngestionMeters.incrementProcessedWithError();
- } else {
- rowIngestionMeters.incrementUnparseable();
- }
-
- if (tuningConfig.isLogParseExceptions()) {
- log.info(
- e,
- "Row at partition[%s] offset[%s] was unparseable.",
- record.getPartitionId(),
- record.getSequenceNumber()
- );
- }
-
- if (savedParseExceptions != null) {
- savedParseExceptions.add(e);
- }
-
- if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
- > tuningConfig.getMaxParseExceptions()) {
- throw new RuntimeException("Max parse exceptions exceeded");
- }
- }
-
private boolean isPaused()
{
return status == Status.PAUSED;
@@ -1588,7 +1546,9 @@
)
{
authorizationCheck(req, Action.READ);
- List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+ List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ parseExceptionHandler.getSavedParseExceptions()
+ );
return Response.ok(events).build();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 6061aff..c4089c9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -19,13 +19,18 @@
package org.apache.druid.indexing.seekablestream;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
+import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.transform.TransformSpec;
import javax.annotation.Nullable;
@@ -33,7 +38,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
/**
* Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
@@ -45,6 +52,9 @@
private final InputRowParser<ByteBuffer> parser;
@Nullable
private final SettableByteEntityReader byteEntityReader;
+ private final Predicate<InputRow> rowFilter;
+ private final RowIngestionMeters rowIngestionMeters;
+ private final ParseExceptionHandler parseExceptionHandler;
/**
* Either parser or inputFormat shouldn't be null.
@@ -54,12 +64,16 @@
@Nullable InputFormat inputFormat,
InputRowSchema inputRowSchema,
TransformSpec transformSpec,
- File indexingTmpDir
+ File indexingTmpDir,
+ Predicate<InputRow> rowFilter,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
if (parser == null && inputFormat == null) {
throw new IAE("Either parser or inputFormat should be set");
}
+ // parser is already decorated with transformSpec in DataSchema
this.parser = parser;
if (inputFormat != null) {
this.byteEntityReader = new SettableByteEntityReader(
@@ -71,27 +85,41 @@
} else {
this.byteEntityReader = null;
}
+ this.rowFilter = rowFilter;
+ this.rowIngestionMeters = rowIngestionMeters;
+ this.parseExceptionHandler = parseExceptionHandler;
}
- List<InputRow> parse(List<byte[]> streamChunk) throws IOException
+ List<InputRow> parse(@Nullable List<byte[]> streamChunk) throws IOException
{
- if (byteEntityReader != null) {
- return parseWithInputFormat(byteEntityReader, streamChunk);
+ if (streamChunk == null || streamChunk.isEmpty()) {
+ rowIngestionMeters.incrementThrownAway();
+ return Collections.emptyList();
} else {
- return parseWithParser(parser, streamChunk);
+ if (byteEntityReader != null) {
+ return parseWithInputFormat(byteEntityReader, streamChunk);
+ } else {
+ return parseWithParser(parser, streamChunk);
+ }
}
}
- private static List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
+ private List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
{
- final List<InputRow> rows = new ArrayList<>();
- for (byte[] valueBytes : valueBytess) {
- rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
- }
- return rows;
+ final FluentIterable<InputRow> iterable = FluentIterable
+ .from(valueBytess)
+ .transformAndConcat(bytes -> parser.parseBatch(ByteBuffer.wrap(bytes)));
+
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ CloseableIterators.withEmptyBaggage(iterable.iterator()),
+ rowFilter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+ return Lists.newArrayList(rowIterator);
}
- private static List<InputRow> parseWithInputFormat(
+ private List<InputRow> parseWithInputFormat(
SettableByteEntityReader byteEntityReader,
List<byte[]> valueBytess
) throws IOException
@@ -99,7 +127,12 @@
final List<InputRow> rows = new ArrayList<>();
for (byte[] valueBytes : valueBytess) {
byteEntityReader.setEntity(new ByteEntity(valueBytes));
- try (CloseableIterator<InputRow> rowIterator = byteEntityReader.read()) {
+ try (FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ byteEntityReader.read(),
+ rowFilter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ )) {
rowIterator.forEachRemaining(rows::add);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2fdf85b..da6dc8b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -45,7 +45,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -82,6 +81,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index ad26c20..50be464 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -26,7 +26,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
@@ -35,6 +34,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index c2bf244..c36a072 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -32,7 +32,6 @@
import org.apache.druid.data.input.impl.NoopInputSource;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@@ -44,6 +43,7 @@
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 290263b..a9d82b1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -65,7 +65,6 @@
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -110,6 +109,7 @@
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -123,6 +123,7 @@
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -157,7 +158,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-public class AppenderatorDriverRealtimeIndexTaskTest
+public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHandlingTest
{
private static final Logger log = new Logger(AppenderatorDriverRealtimeIndexTaskTest.class);
private static final ServiceEmitter EMITTER = new ServiceEmitter(
@@ -683,8 +684,7 @@
// Wait for the task to finish.
TaskStatus status = statusFuture.get();
- Assert.assertTrue(status.getErrorMsg()
- .contains("java.lang.RuntimeException: Max parse exceptions exceeded, terminating task..."));
+ Assert.assertTrue(status.getErrorMsg().contains("org.apache.druid.java.util.common.RE: Max parse exceptions[0] exceeded"));
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
@@ -898,10 +898,10 @@
Map<String, Object> expectedUnparseables = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
- "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
+ "Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]",
- "Unparseable timestamp found! Event: null"
+ "Timestamp[null] is unparseable! Event: null"
)
);
Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
@@ -979,10 +979,10 @@
Map<String, Object> expectedUnparseables = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
- "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
+ "Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]",
- "Unparseable timestamp found! Event: null"
+ "Timestamp[null] is unparseable! Event: null"
)
);
Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index f63fa22..0692c60 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -41,7 +41,6 @@
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
@@ -50,6 +49,7 @@
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index e635381..c82f1e7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -62,7 +62,6 @@
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
@@ -104,6 +103,7 @@
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
new file mode 100644
index 0000000..b26c736
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class FilteringCloseableInputRowIteratorTest
+{
+ private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
+
+ private RowIngestionMeters rowIngestionMeters;
+ private ParseExceptionHandler parseExceptionHandler;
+
+ @Before
+ public void setup()
+ {
+ rowIngestionMeters = new SimpleRowIngestionMeters();
+ parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ Integer.MAX_VALUE,
+ 1024 // do not use Integer.MAX_VALUE since it will create an object array of this length
+ );
+ }
+
+ @Test
+ public void testFilterOutRows()
+ {
+ final List<InputRow> rows = ImmutableList.of(
+ newRow(DateTimes.of("2020-01-01"), 10, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 400),
+ newRow(DateTimes.of("2020-01-01"), 20, 400),
+ newRow(DateTimes.of("2020-01-01"), 10, 800),
+ newRow(DateTimes.of("2020-01-01"), 30, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 300)
+ );
+ final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 10;
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ CloseableIterators.withEmptyBaggage(rows.iterator()),
+ filter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+ Assert.assertEquals(
+ rows.stream().filter(filter).collect(Collectors.toList()),
+ filteredRows
+ );
+ Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+ }
+
+ @Test
+ public void testParseExceptionInDelegateNext()
+ {
+ final List<InputRow> rows = ImmutableList.of(
+ newRow(DateTimes.of("2020-01-01"), 10, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 400),
+ newRow(DateTimes.of("2020-01-01"), 20, 400),
+ newRow(DateTimes.of("2020-01-01"), 10, 800),
+ newRow(DateTimes.of("2020-01-01"), 30, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 300)
+ );
+
+ // This iterator throws ParseException every other call to next().
+ final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+ {
+ final int numRowsToIterate = rows.size() * 2;
+ int nextIdx = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return nextIdx < numRowsToIterate;
+ }
+
+ @Override
+ public InputRow next()
+ {
+ final int currentIdx = nextIdx++;
+ if (currentIdx % 2 == 0) {
+ return rows.get(currentIdx / 2);
+ } else {
+ throw new ParseException("Parse exception at ", currentIdx);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ parseExceptionThrowingIterator,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+ Assert.assertEquals(rows, filteredRows);
+ Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable());
+ }
+
+ @Test
+ public void testParseExceptionInPredicateTest()
+ {
+ final List<InputRow> rows = ImmutableList.of(
+ newRow(DateTimes.of("2020-01-01"), 10, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 400),
+ newRow(DateTimes.of("2020-01-01"), 20, 400),
+ newRow(DateTimes.of("2020-01-01"), 10, 800),
+ newRow(DateTimes.of("2020-01-01"), 30, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 300)
+ );
+
+ final CloseableIterator<InputRow> parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage(
+ rows.iterator()
+ );
+ // This filter throws ParseException every other call to test().
+ final Predicate<InputRow> filter = new Predicate<InputRow>()
+ {
+ boolean throwParseException = false;
+
+ @Override
+ public boolean test(InputRow inputRow)
+ {
+ if (throwParseException) {
+ throwParseException = false;
+ throw new ParseException("test");
+ } else {
+ throwParseException = true;
+ return true;
+ }
+ }
+ };
+
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ parseExceptionThrowingIterator,
+ filter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+ final List<InputRow> expectedRows = ImmutableList.of(
+ rows.get(0),
+ rows.get(2),
+ rows.get(4)
+ );
+ Assert.assertEquals(expectedRows, filteredRows);
+ Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+ }
+
+ @Test
+ public void testCloseDelegateIsClosed() throws IOException
+ {
+ final MutableBoolean closed = new MutableBoolean(false);
+ final CloseableIterator<InputRow> delegate = CloseableIterators.wrap(
+ Collections.emptyIterator(),
+ closed::setTrue
+ );
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ delegate,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+ rowIterator.close();
+ Assert.assertTrue(closed.isTrue());
+ }
+
+ private static InputRow newRow(DateTime timestamp, Object dim1Val, Object dim2Val)
+ {
+ return new MapBasedInputRow(timestamp, DIMENSIONS, ImmutableMap.of("dim1", dim1Val, "dim2", dim2Val));
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 7bb2579..39ee7cf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -50,8 +50,6 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -72,6 +70,8 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -1030,7 +1030,7 @@
RowIngestionMeters.DETERMINE_PARTITIONS,
new ArrayList<>(),
RowIngestionMeters.BUILD_SEGMENTS,
- Collections.singletonList("Unparseable timestamp found! Event: {time=unparseable, d=a, val=1}")
+ Collections.singletonList("Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}")
);
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
@@ -1151,19 +1151,19 @@
RowIngestionMeters.DETERMINE_PARTITIONS,
Arrays.asList(
"Unable to parse row [this is not JSON]",
- "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
- "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
),
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [this is not JSON]",
- "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0, val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to parse value[notnumber] for field[val],]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to float,]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,]",
- "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
)
);
@@ -1281,9 +1281,9 @@
new ArrayList<>(),
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
- "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
- "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
)
);
@@ -1399,9 +1399,9 @@
Map<String, Object> expectedUnparseables = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
Arrays.asList(
- "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
- "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
),
RowIngestionMeters.BUILD_SEGMENTS,
new ArrayList<>()
@@ -1554,7 +1554,7 @@
new ArrayList<>(),
RowIngestionMeters.BUILD_SEGMENTS,
Collections.singletonList(
- "Unparseable timestamp found! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}")
+ "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}")
);
Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());
}
@@ -1697,7 +1697,7 @@
// full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message
Assert.assertThat(
status.getErrorMsg(),
- CoreMatchers.containsString("Max parse exceptions exceeded")
+ CoreMatchers.containsString("Max parse exceptions")
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index d6990c0..80ce36f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -34,7 +34,6 @@
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
@@ -55,6 +54,7 @@
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 869bfcb..b87dcf7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -113,6 +113,7 @@
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
@@ -139,7 +140,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-public class RealtimeIndexTaskTest
+public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
private static final ServiceEmitter EMITTER = new ServiceEmitter(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index ac02949..9dfbd4e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -34,7 +34,6 @@
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -44,6 +43,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 337fda0..2a342b7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -30,6 +30,8 @@
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -64,7 +66,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
realtimeAppenderator = Appenderators.createRealtime(
@@ -83,7 +87,9 @@
joinableFactory,
cache,
cacheConfig,
- cachePopulatorStats
+ cachePopulatorStats,
+ rowIngestionMeters,
+ parseExceptionHandler
);
return realtimeAppenderator;
}
@@ -97,7 +103,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
return Appenderators.createOffline(
@@ -108,7 +116,9 @@
dataSegmentPusher,
objectMapper,
indexIO,
- indexMerger
+ indexMerger,
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 94d1b3f..5ef3120 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -56,7 +56,6 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.IngestionTestBase;
@@ -76,6 +75,7 @@
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index d3da888..87de8c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -34,11 +34,13 @@
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.druid.timeline.partition.PartitionBoundaries;
@@ -149,7 +151,7 @@
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
- public LoggerCaptureRule logger = new LoggerCaptureRule(PartialDimensionDistributionTask.class);
+ public LoggerCaptureRule logger = new LoggerCaptureRule(ParseExceptionHandler.class);
private Capture<SubTaskReport> reportCapture;
private TaskToolbox taskToolbox;
@@ -180,6 +182,7 @@
}
);
EasyMock.expect(taskToolbox.getIndexingServiceClient()).andReturn(new NoopIndexingServiceClient());
+ EasyMock.expect(taskToolbox.getRowIngestionMetersFactory()).andReturn(new DropwizardRowIngestionMetersFactory());
EasyMock.replay(taskToolbox);
}
@@ -253,7 +256,7 @@
.build();
exception.expect(RuntimeException.class);
- exception.expectMessage("Max parse exceptions exceeded");
+ exception.expectMessage("Max parse exceptions[0] exceeded");
task.runTask(taskToolbox);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 75cf7af..b7f107d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -186,9 +186,10 @@
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
- final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Collection<DataSegment> allSegments = new HashSet<>(
- getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE)
+ inputInterval == null
+ ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
+ : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE)
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
@@ -204,10 +205,18 @@
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
- getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
+ inputInterval == null
+ ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
+ : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
+ Assert.assertFalse(newSegments.isEmpty());
allSegments.addAll(newSegments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
- final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
+
+ final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
+ final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(
+ timelineInterval,
+ Partitions.ONLY_COMPLETE
+ );
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
index c5dd5ed..ddf0fbd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/DefaultIndexTaskInputRowIteratorBuilderTest.java
@@ -23,7 +23,6 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
@@ -36,7 +35,6 @@
import java.util.Collections;
import java.util.List;
-import java.util.function.Consumer;
@RunWith(Enclosed.class)
public class DefaultIndexTaskInputRowIteratorBuilderTest
@@ -45,9 +43,6 @@
{
private static final CloseableIterator<InputRow> ITERATOR = EasyMock.mock(CloseableIterator.class);
private static final GranularitySpec GRANULARITY_SPEC = EasyMock.mock(GranularitySpec.class);
- private static final Runnable NULL_ROW_RUNNABLE = IndexTaskInputRowIteratorBuilder.NOOP_RUNNABLE;
- private static final Consumer<InputRow> ABSENT_BUCKET_INTERVAL_CONSUMER =
- IndexTaskInputRowIteratorBuilder.NOOP_CONSUMER;
@Rule
public ExpectedException exception = ExpectedException.none();
@@ -60,8 +55,6 @@
new DefaultIndexTaskInputRowIteratorBuilder()
.granularitySpec(GRANULARITY_SPEC)
- .nullRowRunnable(NULL_ROW_RUNNABLE)
- .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
.build();
}
@@ -73,34 +66,6 @@
new DefaultIndexTaskInputRowIteratorBuilder()
.delegate(ITERATOR)
- .nullRowRunnable(NULL_ROW_RUNNABLE)
- .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
- .build();
- }
-
- @Test
- public void requiresNullRowHandler()
- {
- exception.expect(NullPointerException.class);
- exception.expectMessage("nullRowRunnable required");
-
- new DefaultIndexTaskInputRowIteratorBuilder()
- .delegate(ITERATOR)
- .granularitySpec(GRANULARITY_SPEC)
- .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
- .build();
- }
-
- @Test
- public void requiresAbsentBucketIntervalHandler()
- {
- exception.expect(NullPointerException.class);
- exception.expectMessage("absentBucketIntervalConsumer required");
-
- new DefaultIndexTaskInputRowIteratorBuilder()
- .delegate(ITERATOR)
- .granularitySpec(GRANULARITY_SPEC)
- .nullRowRunnable(NULL_ROW_RUNNABLE)
.build();
}
@@ -110,8 +75,6 @@
new DefaultIndexTaskInputRowIteratorBuilder()
.delegate(ITERATOR)
.granularitySpec(GRANULARITY_SPEC)
- .nullRowRunnable(NULL_ROW_RUNNABLE)
- .absentBucketIntervalConsumer(ABSENT_BUCKET_INTERVAL_CONSUMER)
.build();
}
}
@@ -128,69 +91,6 @@
public ExpectedException exception = ExpectedException.none();
@Test
- public void invokesNullRowHandlerFirst()
- {
- DateTime invalidTimestamp = DateTimes.utc(Long.MAX_VALUE);
- CloseableIterator<InputRow> nullInputRowIterator =
- IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(null);
- GranularitySpec absentBucketIntervalGranularitySpec =
- IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(invalidTimestamp);
-
- List<IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler> handlerInvocationHistory =
- HANDLER_TESTER.invokeHandlers(
- nullInputRowIterator,
- absentBucketIntervalGranularitySpec,
- NO_NEXT_INPUT_ROW
- );
-
- Assert.assertEquals(
- Collections.singletonList(IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler.NULL_ROW),
- handlerInvocationHistory
- );
- }
-
- @Test
- public void invokesInvalidTimestampHandlerBeforeAbsentBucketIntervalHandler()
- {
- DateTime invalidTimestamp = DateTimes.utc(Long.MAX_VALUE);
- InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(invalidTimestamp);
- CloseableIterator<InputRow> inputRowIterator =
- IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(inputRow);
- GranularitySpec absentBucketIntervalGranularitySpec =
- IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(invalidTimestamp);
-
- exception.expect(ParseException.class);
- exception.expectMessage("Encountered row with timestamp that cannot be represented as a long");
-
- HANDLER_TESTER.invokeHandlers(inputRowIterator, absentBucketIntervalGranularitySpec, NO_NEXT_INPUT_ROW);
- }
-
- @Test
- public void invokesAbsentBucketIntervalHandlerLast()
- {
- DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
- InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(timestamp);
- CloseableIterator<InputRow> inputRowIterator =
- IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(inputRow);
- GranularitySpec absentBucketIntervalGranularitySpec =
- IndexTaskInputRowIteratorBuilderTestingFactory.createAbsentBucketIntervalGranularitySpec(timestamp);
-
- List<IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler> handlerInvocationHistory =
- HANDLER_TESTER.invokeHandlers(
- inputRowIterator,
- absentBucketIntervalGranularitySpec,
- NO_NEXT_INPUT_ROW
- );
-
- Assert.assertEquals(
- Collections.singletonList(
- IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester.Handler.ABSENT_BUCKET_INTERVAL
- ),
- handlerInvocationHistory
- );
- }
-
- @Test
public void invokesAppendedHandlersLast()
{
DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
index 39300ac..0ca2712 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
@@ -143,9 +143,7 @@
List<Handler> handlerInvocationHistory = new ArrayList<>();
IndexTaskInputRowIteratorBuilder iteratorBuilder = iteratorBuilderSupplier.get()
.delegate(inputRowIterator)
- .granularitySpec(granularitySpec)
- .nullRowRunnable(() -> handlerInvocationHistory.add(Handler.NULL_ROW))
- .absentBucketIntervalConsumer(row -> handlerInvocationHistory.add(Handler.ABSENT_BUCKET_INTERVAL));
+ .granularitySpec(granularitySpec);
if (iteratorBuilder instanceof DefaultIndexTaskInputRowIteratorBuilder) {
appendedHandlers.stream()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 3ebed1e..af5c988 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -1175,13 +1175,13 @@
private String getUnparseableTimestampString()
{
return ParserType.STR_CSV.equals(parserType)
- ? "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
- : "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+ ? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
+ : "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
}
private String unparseableTimestampErrorString(Map<String, Object> rawColumns)
{
- return StringUtils.format("Unparseable timestamp found! Event: %s", rawColumns);
+ return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
}
@Nullable
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 1073292..526430d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -33,6 +33,9 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Rule;
@@ -58,6 +61,14 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ private final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
+ private final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ 0,
+ 0
+ );
+
@Test
public void testWithParserAndNullInputformatParseProperly() throws IOException
{
@@ -77,7 +88,10 @@
null,
null,
null,
- null
+ null,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
);
parseAndAssertResult(chunkParser);
}
@@ -91,7 +105,10 @@
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
TransformSpec.NONE,
- temporaryFolder.newFolder()
+ temporaryFolder.newFolder(),
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
);
parseAndAssertResult(chunkParser);
}
@@ -106,7 +123,10 @@
null,
null,
null,
- null
+ null,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
@@ -132,7 +152,10 @@
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
TransformSpec.NONE,
- temporaryFolder.newFolder()
+ temporaryFolder.newFolder(),
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
);
parseAndAssertResult(chunkParser);
Assert.assertTrue(inputFormat.used);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 25c9009..6bfbd7f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -30,7 +30,6 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -67,6 +66,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
diff --git a/processing/pom.xml b/processing/pom.xml
index 59873ca..035feb3 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -234,6 +234,16 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 437d30e..a718b53 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -223,7 +223,6 @@
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
- private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex
private final Metadata metadata;
private final Map<String, MetricDesc> metricDescs;
@@ -252,14 +251,11 @@
* @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
- * @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values
- * from input rows
* @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe
*/
protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
- final boolean reportParseExceptions,
final boolean concurrentEventAdd
)
{
@@ -270,7 +266,6 @@
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
- this.reportParseExceptions = reportParseExceptions;
this.timeAndMetricsColumnCapabilities = new HashMap<>();
this.metadata = new Metadata(
@@ -329,7 +324,6 @@
@Nullable
private IncrementalIndexSchema incrementalIndexSchema;
private boolean deserializeComplexMetrics;
- private boolean reportParseExceptions;
private boolean concurrentEventAdd;
private boolean sortFacts;
private int maxRowCount;
@@ -339,7 +333,6 @@
{
incrementalIndexSchema = null;
deserializeComplexMetrics = true;
- reportParseExceptions = true;
concurrentEventAdd = false;
sortFacts = true;
maxRowCount = 0;
@@ -391,12 +384,6 @@
return this;
}
- public Builder setReportParseExceptions(final boolean reportParseExceptions)
- {
- this.reportParseExceptions = reportParseExceptions;
- return this;
- }
-
public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
this.concurrentEventAdd = concurrentEventAdd;
@@ -431,7 +418,6 @@
return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
deserializeComplexMetrics,
- reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount,
@@ -448,7 +434,6 @@
return new OffheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
deserializeComplexMetrics,
- reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount,
@@ -610,7 +595,7 @@
skipMaxRowsInMemoryCheck
);
updateMaxIngestedTime(row.getTimestamp());
- ParseException parseException = getCombinedParseException(
+ @Nullable ParseException parseException = getCombinedParseException(
row,
incrementalIndexRowResult.getParseExceptionMessages(),
addToFactsResult.getParseExceptionMessages()
@@ -753,13 +738,12 @@
if (numAdded == 0) {
return null;
}
- ParseException pe = new ParseException(
+ return new ParseException(
+ true,
"Found unparseable columns in row: [%s], exceptions: [%s]",
row,
stringBuilder.toString()
);
- pe.setFromPartiallyValidRow(true);
- return pe;
}
private synchronized void updateMaxIngestedTime(DateTime eventTime)
@@ -784,11 +768,6 @@
return deserializeComplexMetrics;
}
- boolean getReportParseExceptions()
- {
- return reportParseExceptions;
- }
-
AtomicInteger getNumEntries()
{
return numEntries;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
index c483e50..077f162 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAddResult.java
@@ -31,9 +31,9 @@
@Nullable
private final ParseException parseException;
@Nullable
- private String reasonOfNotAdded;
+ private final String reasonOfNotAdded;
- public IncrementalIndexAddResult(
+ private IncrementalIndexAddResult(
int rowCount,
long bytesInMemory,
@Nullable ParseException parseException,
@@ -55,6 +55,23 @@
this(rowCount, bytesInMemory, parseException, null);
}
+ public IncrementalIndexAddResult(
+ int rowCount,
+ long bytesInMemory,
+ String reasonOfNotAdded
+ )
+ {
+ this(rowCount, bytesInMemory, null, reasonOfNotAdded);
+ }
+
+ public IncrementalIndexAddResult(
+ int rowCount,
+ long bytesInMemory
+ )
+ {
+ this(rowCount, bytesInMemory, null, null);
+ }
+
public int getRowCount()
{
return rowCount;
@@ -65,12 +82,22 @@
return bytesInMemory;
}
+ public boolean hasParseException()
+ {
+ return parseException != null;
+ }
+
@Nullable
public ParseException getParseException()
{
return parseException;
}
+ public boolean isRowAdded()
+ {
+ return reasonOfNotAdded == null && parseException == null;
+ }
+
@Nullable
public String getReasonOfNotAdded()
{
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
new file mode 100644
index 0000000..3ce1138
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This class is used only in {@code RealtimeIndexTask} which is deprecated now.
+ *
+ * Consider using {@link RowIngestionMetersFactory} instead.
+ */
+public class NoopRowIngestionMeters implements RowIngestionMeters
+{
+ private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0);
+
+ @Override
+ public long getProcessed()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementProcessed()
+ {
+
+ }
+
+ @Override
+ public long getProcessedWithError()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementProcessedWithError()
+ {
+
+ }
+
+ @Override
+ public long getUnparseable()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementUnparseable()
+ {
+
+ }
+
+ @Override
+ public long getThrownAway()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementThrownAway()
+ {
+
+ }
+
+ @Override
+ public RowIngestionMetersTotals getTotals()
+ {
+ return EMPTY_TOTALS;
+ }
+
+ @Override
+ public Map<String, Object> getMovingAverages()
+ {
+ return Collections.emptyMap();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 490f625..97da994 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -34,7 +34,6 @@
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -77,14 +76,13 @@
OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount,
NonBlockingPool<ByteBuffer> bufferPool
)
{
- super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
+ super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
@@ -224,6 +222,7 @@
rowContainer.set(row);
+ final List<String> parseExceptionMessages = new ArrayList<>();
for (int i = 0; i < getMetrics().length; i++) {
final BufferAggregator agg = getAggs()[i];
@@ -233,16 +232,13 @@
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
- if (getReportParseExceptions()) {
- throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
- } else {
- log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
- }
+ log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
+ parseExceptionMessages.add(e.getMessage());
}
}
}
rowContainer.set(null);
- return new AddToFactsResult(getNumEntries().get(), 0, new ArrayList<>());
+ return new AddToFactsResult(getNumEntries().get(), 0, parseExceptionMessages);
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 04cdaba..9bbdf60 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -69,14 +69,13 @@
OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount,
long maxBytesInMemory
)
{
- super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
+ super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd);
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -150,7 +149,7 @@
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
- List<String> parseExceptionMessages;
+ final List<String> parseExceptionMessages = new ArrayList<>();
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
@@ -159,11 +158,11 @@
final AtomicLong sizeInBytes = getBytesInMemory();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
- parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+ doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
} else {
aggs = new Aggregator[metrics.length];
factorizeAggs(metrics, aggs, rowContainer, row);
- parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+ doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
final int rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);
@@ -185,8 +184,9 @@
sizeInBytes.addAndGet(estimatedRowSize);
} else {
// We lost a race
+ parseExceptionMessages.clear();
aggs = concurrentGet(prev);
- parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+ doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
// Free up the misfire
concurrentRemove(rowIndex);
// This is expected to occur ~80% of the time in the worst scenarios
@@ -235,14 +235,14 @@
rowContainer.set(null);
}
- private List<String> doAggregate(
+ private void doAggregate(
AggregatorFactory[] metrics,
Aggregator[] aggs,
ThreadLocal<InputRow> rowContainer,
- InputRow row
+ InputRow row,
+ List<String> parseExceptionsHolder
)
{
- List<String> parseExceptionMessages = new ArrayList<>();
rowContainer.set(row);
for (int i = 0; i < aggs.length; i++) {
@@ -254,13 +254,12 @@
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
- parseExceptionMessages.add(e.getMessage());
+ parseExceptionsHolder.add(e.getMessage());
}
}
}
rowContainer.set(null);
- return parseExceptionMessages;
}
private void closeAggregators()
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java b/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
new file mode 100644
index 0000000..20faa2c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CircularBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * A handler for {@link ParseException}s thrown during ingestion. Based on the given configuration, this handler can
+ *
+ * - log ParseExceptions.
+ * - keep most recent N ParseExceptions in memory.
+ * - throw a RuntimeException when it sees more ParseExceptions than {@link #maxAllowedParseExceptions}.
+ *
+ * No matter what the handler does, the relevant metric should be updated first.
+ */
+public class ParseExceptionHandler
+{
+ private static final Logger LOG = new Logger(ParseExceptionHandler.class);
+
+ private final RowIngestionMeters rowIngestionMeters;
+ private final boolean logParseExceptions;
+ private final int maxAllowedParseExceptions;
+ @Nullable
+ private final CircularBuffer<ParseException> savedParseExceptions;
+
+ public ParseExceptionHandler(
+ RowIngestionMeters rowIngestionMeters,
+ boolean logParseExceptions,
+ int maxAllowedParseExceptions,
+ int maxSavedParseExceptions
+ )
+ {
+ this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+ this.logParseExceptions = logParseExceptions;
+ this.maxAllowedParseExceptions = maxAllowedParseExceptions;
+ if (maxSavedParseExceptions > 0) {
+ this.savedParseExceptions = new CircularBuffer<>(maxSavedParseExceptions);
+ } else {
+ this.savedParseExceptions = null;
+ }
+ }
+
+ public void handle(@Nullable ParseException e)
+ {
+ if (e == null) {
+ return;
+ }
+ if (e.isFromPartiallyValidRow()) {
+ rowIngestionMeters.incrementProcessedWithError();
+ } else {
+ rowIngestionMeters.incrementUnparseable();
+ }
+
+ if (logParseExceptions) {
+ LOG.error(e, "Encountered parse exception");
+ }
+
+ if (savedParseExceptions != null) {
+ savedParseExceptions.add(e);
+ }
+
+ if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError() > maxAllowedParseExceptions) {
+ throw new RE("Max parse exceptions[%s] exceeded", maxAllowedParseExceptions);
+ }
+ }
+
+ @Nullable
+ public CircularBuffer<ParseException> getSavedParseExceptions()
+ {
+ return savedParseExceptions;
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
similarity index 97%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 725a5a2..efefca7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMeters.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
import org.apache.druid.guice.annotations.ExtensionPoint;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
similarity index 94%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
index 4035dbe..c395dcd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
public interface RowIngestionMetersFactory
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
similarity index 97%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index 022548f..d9c6ce5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/RowIngestionMetersTotals.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.common.stats;
+package org.apache.druid.segment.incremental;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 8e0eb0a..a2e994e 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -147,7 +147,6 @@
.withRollup(withRollup)
.build()
)
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 3399377..357a1e2 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -154,7 +154,6 @@
.withRollup(withRollup)
.build()
)
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index b729e7d..4f9b1ff 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -141,7 +141,6 @@
.withRollup(withRollup)
.build()
)
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index fc5535f..9d793c7 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -140,7 +140,6 @@
))
.build()
)
- .setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
index b4525c0..ccc6ff7 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
@@ -125,7 +125,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
@@ -301,7 +300,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
@@ -329,7 +327,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
@@ -357,7 +354,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
index 9328904..a6207aa 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
@@ -133,7 +133,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
@@ -282,7 +281,6 @@
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
@@ -305,7 +303,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
@@ -329,7 +326,6 @@
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index 9d13c31..744b25d 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -137,7 +137,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
@@ -303,7 +302,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
@@ -330,7 +328,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
@@ -357,7 +354,6 @@
)
).build()
)
- .setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java
new file mode 100644
index 0000000..39c9402
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAddResultTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IncrementalIndexAddResultTest
+{
+ @Test
+ public void testIsRowAdded()
+ {
+ Assert.assertTrue(new IncrementalIndexAddResult(0, 0L).isRowAdded());
+ Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, "test").isRowAdded());
+ Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, new ParseException("test")).isRowAdded());
+ }
+
+ @Test
+ public void testHasParseException()
+ {
+ Assert.assertFalse(new IncrementalIndexAddResult(0, 0L).hasParseException());
+ Assert.assertFalse(new IncrementalIndexAddResult(0, 0L, "test").hasParseException());
+ Assert.assertTrue(new IncrementalIndexAddResult(0, 0L, new ParseException("test")).hasParseException());
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 55dba3f..7b77839 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -36,7 +36,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
@@ -112,7 +111,6 @@
public MapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
- boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount,
@@ -122,7 +120,6 @@
super(
incrementalIndexSchema,
deserializeComplexMetrics,
- reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount,
@@ -145,7 +142,6 @@
.withMetrics(metrics)
.build(),
true,
- true,
false,
true,
maxRowCount,
@@ -222,15 +218,7 @@
for (Aggregator agg : aggs) {
synchronized (agg) {
- try {
- agg.aggregate();
- }
- catch (ParseException e) {
- // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
- if (getReportParseExceptions()) {
- throw e;
- }
- }
+ agg.aggregate();
}
}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java
new file mode 100644
index 0000000..713aa7f
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/ParseExceptionHandlerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.core.LogEvent;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+public class ParseExceptionHandlerTest
+{
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Rule
+ public LoggerCaptureRule logger = new LoggerCaptureRule(ParseExceptionHandler.class);
+
+ @Test
+ public void testMetricWhenAllConfigurationsAreTurnedOff()
+ {
+ final ParseException parseException = new ParseException("test");
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ Integer.MAX_VALUE,
+ 0
+ );
+
+ IntStream.range(0, 100).forEach(i -> {
+ parseExceptionHandler.handle(parseException);
+ Assert.assertEquals(i + 1, rowIngestionMeters.getUnparseable());
+ });
+ }
+
+ @Test
+ public void testLogParseExceptions()
+ {
+ final ParseException parseException = new ParseException("test");
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ true,
+ Integer.MAX_VALUE,
+ 0
+ );
+ parseExceptionHandler.handle(parseException);
+
+ List<LogEvent> logEvents = logger.getLogEvents();
+ Assert.assertEquals(1, logEvents.size());
+ String logMessage = logEvents.get(0).getMessage().getFormattedMessage();
+ Assert.assertTrue(logMessage.contains("Encountered parse exception"));
+ }
+
+ @Test
+ public void testGetSavedParseExceptionsReturnNullWhenMaxSavedParseExceptionsIsZero()
+ {
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ new SimpleRowIngestionMeters(),
+ false,
+ Integer.MAX_VALUE,
+ 0
+ );
+ Assert.assertNull(parseExceptionHandler.getSavedParseExceptions());
+ }
+
+ @Test
+ public void testMaxAllowedParseExceptionsThrowExceptionWhenItHitsMax()
+ {
+ final ParseException parseException = new ParseException("test");
+ final int maxAllowedParseExceptions = 3;
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ maxAllowedParseExceptions,
+ 0
+ );
+
+ IntStream.range(0, maxAllowedParseExceptions).forEach(i -> parseExceptionHandler.handle(parseException));
+ Assert.assertEquals(3, rowIngestionMeters.getUnparseable());
+
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage("Max parse exceptions[3] exceeded");
+ try {
+ parseExceptionHandler.handle(parseException);
+ }
+ catch (RuntimeException e) {
+ Assert.assertEquals(4, rowIngestionMeters.getUnparseable());
+ throw e;
+ }
+ }
+
+ @Test
+ public void testGetSavedParseExceptionsReturnMostRecentParseExceptions()
+ {
+ final int maxSavedParseExceptions = 3;
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ Integer.MAX_VALUE,
+ maxSavedParseExceptions
+ );
+ Assert.assertNotNull(parseExceptionHandler.getSavedParseExceptions());
+ int exceptionCounter = 0;
+ for (; exceptionCounter < maxSavedParseExceptions; exceptionCounter++) {
+ parseExceptionHandler.handle(new ParseException(StringUtils.format("test %d", exceptionCounter)));
+ }
+ Assert.assertEquals(3, rowIngestionMeters.getUnparseable());
+ Assert.assertEquals(maxSavedParseExceptions, parseExceptionHandler.getSavedParseExceptions().size());
+ for (int i = 0; i < maxSavedParseExceptions; i++) {
+ Assert.assertEquals(
+ StringUtils.format("test %d", i),
+ parseExceptionHandler.getSavedParseExceptions().get(i).getMessage()
+ );
+ }
+ for (; exceptionCounter < 5; exceptionCounter++) {
+ parseExceptionHandler.handle(new ParseException(StringUtils.format("test %d", exceptionCounter)));
+ }
+ Assert.assertEquals(5, rowIngestionMeters.getUnparseable());
+
+ Assert.assertEquals(maxSavedParseExceptions, parseExceptionHandler.getSavedParseExceptions().size());
+ for (int i = 0; i < maxSavedParseExceptions; i++) {
+ Assert.assertEquals(
+ StringUtils.format("test %d", i + 2),
+ parseExceptionHandler.getSavedParseExceptions().get(i).getMessage()
+ );
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
new file mode 100644
index 0000000..3fd14e0
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Map;
+
+public class SimpleRowIngestionMeters implements RowIngestionMeters
+{
+ private long processed;
+ private long processedWithError;
+ private long unparseable;
+ private long thrownAway;
+
+ @Override
+ public long getProcessed()
+ {
+ return processed;
+ }
+
+ @Override
+ public void incrementProcessed()
+ {
+ processed++;
+ }
+
+ @Override
+ public long getProcessedWithError()
+ {
+ return processedWithError;
+ }
+
+ @Override
+ public void incrementProcessedWithError()
+ {
+ processedWithError++;
+ }
+
+ @Override
+ public long getUnparseable()
+ {
+ return unparseable;
+ }
+
+ @Override
+ public void incrementUnparseable()
+ {
+ unparseable++;
+ }
+
+ @Override
+ public long getThrownAway()
+ {
+ return thrownAway;
+ }
+
+ @Override
+ public void incrementThrownAway()
+ {
+ thrownAway++;
+ }
+
+ @Override
+ public RowIngestionMetersTotals getTotals()
+ {
+ return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable);
+ }
+
+ @Override
+ public Map<String, Object> getMovingAverages()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
index 4657158c..f0508df 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
@@ -80,8 +80,7 @@
{
return Collections.singletonList(
MapInputRowParser.parse(
- inputRowSchema.getTimestampSpec(),
- inputRowSchema.getDimensionsSpec(),
+ inputRowSchema,
intermediateRow
)
);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 56c3a5d..14796df 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -24,7 +24,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
@@ -227,20 +226,15 @@
private final int numRowsInSegment;
private final boolean isPersistRequired;
- @Nullable
- private final ParseException parseException;
-
AppenderatorAddResult(
SegmentIdWithShardSpec identifier,
int numRowsInSegment,
- boolean isPersistRequired,
- @Nullable ParseException parseException
+ boolean isPersistRequired
)
{
this.segmentIdentifier = identifier;
this.numRowsInSegment = numRowsInSegment;
this.isPersistRequired = isPersistRequired;
- this.parseException = parseException;
}
SegmentIdWithShardSpec getSegmentIdentifier()
@@ -257,11 +251,5 @@
{
return isPersistRequired;
}
-
- @Nullable
- public ParseException getParseException()
- {
- return parseException;
- }
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
index 62ee322..92641e3 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
@@ -19,8 +19,6 @@
package org.apache.druid.segment.realtime.appenderator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-
import javax.annotation.Nullable;
/**
@@ -35,44 +33,37 @@
private final long totalNumRowsInAppenderator;
private final boolean isPersistRequired;
- @Nullable
- private final ParseException parseException;
-
public static AppenderatorDriverAddResult ok(
SegmentIdWithShardSpec segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator,
- boolean isPersistRequired,
- @Nullable ParseException parseException
+ boolean isPersistRequired
)
{
return new AppenderatorDriverAddResult(
segmentIdentifier,
numRowsInSegment,
totalNumRowsInAppenderator,
- isPersistRequired,
- parseException
+ isPersistRequired
);
}
public static AppenderatorDriverAddResult fail()
{
- return new AppenderatorDriverAddResult(null, 0, 0, false, null);
+ return new AppenderatorDriverAddResult(null, 0, 0, false);
}
private AppenderatorDriverAddResult(
@Nullable SegmentIdWithShardSpec segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator,
- boolean isPersistRequired,
- @Nullable ParseException parseException
+ boolean isPersistRequired
)
{
this.segmentIdentifier = segmentIdentifier;
this.numRowsInSegment = numRowsInSegment;
this.totalNumRowsInAppenderator = totalNumRowsInAppenderator;
this.isPersistRequired = isPersistRequired;
- this.parseException = parseException;
}
public boolean isOk()
@@ -111,10 +102,4 @@
}
return overThreshold;
}
-
- @Nullable
- public ParseException getParseException()
- {
- return parseException;
- }
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 62a5ba9..c65c2ad 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -61,6 +61,8 @@
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -130,6 +132,8 @@
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final AtomicInteger totalRows = new AtomicInteger();
private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+ private final RowIngestionMeters rowIngestionMeters;
+ private final ParseExceptionHandler parseExceptionHandler;
// Synchronize persisting commitMetadata so that multiple persist threads (if present)
// and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock();
@@ -168,7 +172,9 @@
@Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
IndexIO indexIO,
IndexMerger indexMerger,
- Cache cache
+ Cache cache,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
this.myId = id;
@@ -182,6 +188,8 @@
this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
this.cache = cache;
this.texasRanger = sinkQuerySegmentWalker;
+ this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+ this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
if (sinkQuerySegmentWalker == null) {
this.sinkTimeline = new VersionedIntervalTimeline<>(
@@ -267,6 +275,12 @@
throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
}
+ if (addResult.isRowAdded()) {
+ rowIngestionMeters.incrementProcessed();
+ } else if (addResult.hasParseException()) {
+ parseExceptionHandler.handle(addResult.getParseException());
+ }
+
final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
rowsCurrentlyInMemory.addAndGet(numAddedRows);
bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd);
@@ -329,7 +343,7 @@
isPersistRequired = true;
}
}
- return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired, addResult.getParseException());
+ return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
}
@Override
@@ -392,7 +406,6 @@
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
- tuningConfig.isReportParseExceptions(),
null
);
@@ -1110,7 +1123,6 @@
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
- tuningConfig.isReportParseExceptions(),
null,
hydrants
);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
index 28c3ac7..2aa5d43 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
@@ -159,7 +159,7 @@
try {
final Appenderator.AppenderatorAddResult addResult = appenderator.add(identifier, row, committerSupplier);
lastCommitterSupplier = committerSupplier;
- return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0, addResult.getParseException());
+ return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0);
}
catch (SegmentNotWritableException e) {
// Segment already started handoff
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 7ddb409..c59e4e0 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -28,12 +28,14 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
-import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.concurrent.ExecutorService;
@@ -56,7 +58,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
return new AppenderatorImpl(
@@ -83,7 +87,9 @@
),
indexIO,
indexMerger,
- cache
+ cache,
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
@@ -95,7 +101,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
return new AppenderatorImpl(
@@ -105,36 +113,13 @@
metrics,
dataSegmentPusher,
objectMapper,
- new DataSegmentAnnouncer()
- {
- @Override
- public void announceSegment(DataSegment segment)
- {
- // Do nothing
- }
-
- @Override
- public void unannounceSegment(DataSegment segment)
- {
- // Do nothing
- }
-
- @Override
- public void announceSegments(Iterable<DataSegment> segments)
- {
- // Do nothing
- }
-
- @Override
- public void unannounceSegments(Iterable<DataSegment> segments)
- {
- // Do nothing
- }
- },
+ new NoopDataSegmentAnnouncer(),
null,
indexIO,
indexMerger,
- null
+ null,
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index c0d4b9c..76c64d2 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -30,6 +30,8 @@
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -78,7 +80,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
);
/**
@@ -92,7 +96,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
);
/**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 5ecefeb..dc16d59 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -413,8 +413,7 @@
identifier,
result.getNumRowsInSegment(),
appenderator.getTotalRowCount(),
- result.isPersistRequired(),
- result.getParseException()
+ result.isPersistRequired()
);
}
catch (SegmentNotWritableException e) {
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
index ed53c2c..7a0f1dc 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
@@ -24,6 +24,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -54,6 +57,7 @@
@Override
public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics)
{
+ final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createOffline(
schema.getDataSource(),
schema,
@@ -62,7 +66,14 @@
dataSegmentPusher,
objectMapper,
indexIO,
- indexMerger
+ indexMerger,
+ rowIngestionMeters,
+ new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
+ 0
+ )
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 969ed32..6ee6b4b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -30,6 +30,9 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
@@ -92,6 +95,7 @@
final FireDepartmentMetrics metrics
)
{
+ final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createRealtime(
schema.getDataSource(),
schema,
@@ -114,7 +118,14 @@
joinableFactory,
cache,
cacheConfig,
- cachePopulatorStats
+ cachePopulatorStats,
+ rowIngestionMeters,
+ new ParseExceptionHandler(
+ rowIngestionMeters,
+ false,
+ config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
+ 0
+ )
);
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 66ce98b..87de244 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -31,6 +31,8 @@
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -69,7 +71,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
throw new UOE(ERROR_MSG);
@@ -84,7 +88,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
throw new UOE(ERROR_MSG);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index be21051..7fa4f4c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -31,6 +31,8 @@
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -75,7 +77,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
if (realtimeAppenderator != null) {
@@ -99,7 +103,9 @@
joinableFactory,
cache,
cacheConfig,
- cachePopulatorStats
+ cachePopulatorStats,
+ rowIngestionMeters,
+ parseExceptionHandler
);
}
return realtimeAppenderator;
@@ -114,7 +120,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
// CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@@ -129,7 +137,9 @@
dataSegmentPusher,
objectMapper,
indexIO,
- indexMerger
+ indexMerger,
+ rowIngestionMeters,
+ parseExceptionHandler
);
return batchAppenderator;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 885067a..7fae74c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -50,6 +50,8 @@
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -158,7 +160,9 @@
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
- CachePopulatorStats cachePopulatorStats
+ CachePopulatorStats cachePopulatorStats,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
synchronized (this) {
@@ -178,7 +182,9 @@
datasourceBundle.getWalker(),
indexIO,
wrapIndexMerger(indexMerger),
- cache
+ cache,
+ rowIngestionMeters,
+ parseExceptionHandler
);
datasourceBundle.addAppenderator(taskId, appenderator);
@@ -195,7 +201,9 @@
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
IndexIO indexIO,
- IndexMerger indexMerger
+ IndexMerger indexMerger,
+ RowIngestionMeters rowIngestionMeters,
+ ParseExceptionHandler parseExceptionHandler
)
{
synchronized (this) {
@@ -212,7 +220,9 @@
dataSegmentPusher,
objectMapper,
indexIO,
- wrapIndexMerger(indexMerger)
+ wrapIndexMerger(indexMerger),
+ rowIngestionMeters,
+ parseExceptionHandler
);
datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
index d81f485..e7fab4a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Plumber.java
@@ -29,9 +29,9 @@
public interface Plumber
{
- IncrementalIndexAddResult THROWAWAY = new IncrementalIndexAddResult(-1, -1, null, "row too late");
- IncrementalIndexAddResult NOT_WRITABLE = new IncrementalIndexAddResult(-1, -1, null, "not writable");
- IncrementalIndexAddResult DUPLICATE = new IncrementalIndexAddResult(-2, -1, null, "duplicate row");
+ IncrementalIndexAddResult THROWAWAY = new IncrementalIndexAddResult(-1, -1, "row too late");
+ IncrementalIndexAddResult NOT_WRITABLE = new IncrementalIndexAddResult(-1, -1, "not writable");
+ IncrementalIndexAddResult DUPLICATE = new IncrementalIndexAddResult(-2, -1, "duplicate row");
/**
* Perform any initial setup. Should be called before using any other methods, and should be paired
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index 6210754..bae600c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -116,7 +116,6 @@
String.CASE_INSENSITIVE_ORDER
);
private final QuerySegmentWalker texasRanger;
-
private final Cache cache;
private volatile long nextFlush = 0;
@@ -263,7 +262,6 @@
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
- config.isReportParseExceptions(),
config.getDedupColumn()
);
addSink(retVal);
@@ -727,7 +725,6 @@
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
- config.isReportParseExceptions(),
config.getDedupColumn(),
hydrants
);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index e7e6c33..d6cb223 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -56,7 +56,7 @@
public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
{
private static final IncrementalIndexAddResult ALREADY_SWAPPED =
- new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
+ new IncrementalIndexAddResult(-1, -1, "write after index swapped");
private final Object hydrantLock = new Object();
private final Interval interval;
@@ -65,15 +65,15 @@
private final String version;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
- private final boolean reportParseExceptions;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
- private volatile FireHydrant currHydrant;
- private volatile boolean writable = true;
private final String dedupColumn;
private final Set<Long> dedupSet = new HashSet<>();
+ private volatile FireHydrant currHydrant;
+ private volatile boolean writable = true;
+
public Sink(
Interval interval,
DataSchema schema,
@@ -81,7 +81,6 @@
String version,
int maxRowsInMemory,
long maxBytesInMemory,
- boolean reportParseExceptions,
String dedupColumn
)
{
@@ -92,7 +91,6 @@
version,
maxRowsInMemory,
maxBytesInMemory,
- reportParseExceptions,
dedupColumn,
Collections.emptyList()
);
@@ -105,7 +103,6 @@
String version,
int maxRowsInMemory,
long maxBytesInMemory,
- boolean reportParseExceptions,
String dedupColumn,
List<FireHydrant> hydrants
)
@@ -116,7 +113,6 @@
this.version = version;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
- this.reportParseExceptions = reportParseExceptions;
this.dedupColumn = dedupColumn;
int maxCount = -1;
@@ -328,7 +324,6 @@
.build();
final IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
- .setReportParseExceptions(reportParseExceptions)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
.buildOnheap();
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index 607a3c9..19ee7ef 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -43,6 +43,8 @@
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -481,6 +483,23 @@
}
@Test
+ public void testVerifyRowIngestionMetrics() throws Exception
+ {
+ final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+ try (final AppenderatorTester tester = new AppenderatorTester(5, 1000L, null, false, rowIngestionMeters)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Committers.nilSupplier());
+
+ Assert.assertEquals(1, rowIngestionMeters.getProcessed());
+ Assert.assertEquals(1, rowIngestionMeters.getProcessedWithError());
+ Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
+ Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
+ }
+ }
+
+ @Test
public void testQueryByIntervals() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
@@ -792,7 +811,7 @@
);
}
- static InputRow ir(String ts, String dim, long met)
+ static InputRow ir(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index eaa7957..9e57d28 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -54,6 +54,9 @@
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -61,7 +64,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -116,11 +119,22 @@
public AppenderatorTester(
final int maxRowsInMemory,
- long maxSizeInBytes,
+ final long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure
)
{
+ this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters());
+ }
+
+ public AppenderatorTester(
+ final int maxRowsInMemory,
+ final long maxSizeInBytes,
+ final File basePersistDirectory,
+ final boolean enablePushFailure,
+ final RowIngestionMeters rowIngestionMeters
+ )
+ {
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(LinearShardSpec.class);
@@ -147,10 +161,9 @@
null,
objectMapper
);
- maxSizeInBytes = maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes;
tuningConfig = new RealtimeTuningConfig(
maxRowsInMemory,
- maxSizeInBytes,
+ maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
null,
null,
basePersistDirectory,
@@ -255,38 +268,15 @@
)
)
),
- new DataSegmentAnnouncer()
- {
- @Override
- public void announceSegment(DataSegment segment)
- {
-
- }
-
- @Override
- public void unannounceSegment(DataSegment segment)
- {
-
- }
-
- @Override
- public void announceSegments(Iterable<DataSegment> segments)
- {
-
- }
-
- @Override
- public void unannounceSegments(Iterable<DataSegment> segments)
- {
-
- }
- },
+ new NoopDataSegmentAnnouncer(),
emitter,
queryExecutor,
NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),
- new CachePopulatorStats()
+ new CachePopulatorStats(),
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0)
);
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 16d3c90..4f9cd3c 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -418,7 +418,7 @@
{
rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row);
numRows++;
- return new AppenderatorAddResult(identifier, numRows, false, null);
+ return new AppenderatorAddResult(identifier, numRows, false);
}
@Override
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index a98bb98..0eb62e8 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -280,7 +280,6 @@
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
- tuningConfig.isReportParseExceptions(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@@ -326,7 +325,6 @@
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
- tuningConfig.isReportParseExceptions(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@@ -377,7 +375,6 @@
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
- tuningConfig.isReportParseExceptions(),
tuningConfig.getDedupColumn()
);
plumber2.getSinks().put(0L, sink);
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index df59fa7..bf94b2f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -93,7 +93,6 @@
version,
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
- tuningConfig.isReportParseExceptions(),
tuningConfig.getDedupColumn()
);
@@ -248,7 +247,6 @@
version,
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
- tuningConfig.isReportParseExceptions(),
tuningConfig.getDedupColumn()
);
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 19ed6bc..531746f 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -47,7 +47,6 @@
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -63,6 +62,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 8ef1eca..e08e5ea 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -60,7 +60,6 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
@@ -100,6 +99,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 914c7f1..6f7cacd 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -74,7 +74,6 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
@@ -94,6 +93,7 @@
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;