Remove ParseSpec.toInputFormat() (#9815)
* Remove toInputFormat() from ParseSpec
* fix test
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
index 51ffd38..81c8a26 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.CSVParser;
import org.apache.druid.java.util.common.parsers.Parser;
@@ -98,12 +97,6 @@
}
@Override
- public InputFormat toInputFormat()
- {
- return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
- }
-
- @Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows);
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
index 3ee0f71..5940e70 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.DelimitedParser;
import org.apache.druid.java.util.common.parsers.Parser;
@@ -125,12 +124,6 @@
}
@Override
- public InputFormat toInputFormat()
- {
- return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows);
- }
-
- @Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new DelimitedParseSpec(
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
index 870076d..3a7136b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
@@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathParser;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.Parser;
@@ -69,12 +68,6 @@
}
@Override
- public InputFormat toInputFormat()
- {
- return new JsonInputFormat(getFlattenSpec(), getFeatureSpec());
- }
-
- @Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getFeatureSpec());
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
index adc5299..33e9f44 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
@@ -23,13 +23,10 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.parsers.Parser;
-import javax.annotation.Nullable;
-
@Deprecated
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format")
@@ -71,16 +68,6 @@
return null;
}
- /**
- * Returns null if it's not implemented yet.
- * This method (and maybe this class) will be removed in favor of {@link InputFormat} in the future.
- */
- @Nullable
- public InputFormat toInputFormat()
- {
- return null;
- }
-
@PublicApi
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
index c163de1..088bed5 100644
--- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
@@ -52,7 +52,7 @@
}
final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines);
final StringInputRowParser inputRowParser = new StringInputRowParser(
- new UnimplementedInputFormatCsvParseSpec(
+ new CSVParseSpec(
new TimestampSpec(null, "yyyyMMdd", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))),
",",
@@ -95,28 +95,6 @@
}
}
- private static class UnimplementedInputFormatCsvParseSpec extends CSVParseSpec
- {
- private UnimplementedInputFormatCsvParseSpec(
- TimestampSpec timestampSpec,
- DimensionsSpec dimensionsSpec,
- String listDelimiter,
- List<String> columns,
- boolean hasHeaderRow,
- int skipHeaderRows
- )
- {
- super(timestampSpec, dimensionsSpec, listDelimiter, columns, hasHeaderRow, skipHeaderRows);
- }
-
- @Nullable
- @Override
- public InputFormat toInputFormat()
- {
- return null;
- }
- }
-
private static class TestFirehoseFactory implements FiniteFirehoseFactory<StringInputRowParser, Object>
{
private final List<String> lines;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ba12128..6c88c5a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -194,9 +194,7 @@
true,
minimumMessageTime,
maximumMessageTime,
- ioConfig.getInputFormat(
- spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
- )
+ ioConfig.getInputFormat()
);
}
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index c789fc7..0000ee6 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -140,9 +140,7 @@
true,
minimumMessageTime,
maximumMessageTime,
- ioConfig.getInputFormat(
- spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
- ),
+ ioConfig.getInputFormat(),
ioConfig.getEndpoint(),
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index be24995..712340f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -44,7 +44,6 @@
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.Rows;
import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.IngestionState;
@@ -1046,10 +1045,7 @@
private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
{
- final InputRowParser parser = ingestionSchema.getDataSchema().getParser();
- return ingestionSchema.getIOConfig().getNonNullInputFormat(
- parser == null ? null : parser.getParseSpec()
- );
+ return ingestionSchema.getIOConfig().getNonNullInputFormat();
}
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
@@ -1184,13 +1180,9 @@
}
}
- public InputFormat getNonNullInputFormat(@Nullable ParseSpec parseSpec)
+ public InputFormat getNonNullInputFormat()
{
- if (inputFormat == null) {
- return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
- } else {
- return inputFormat;
- }
+ return Preconditions.checkNotNull(inputFormat, "inputFormat");
}
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index cfbc0ca..57c117d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -30,7 +30,6 @@
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
-import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
@@ -196,19 +195,8 @@
if (lastStatus != null) {
LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId());
} else {
- final SinglePhaseSubTaskSpec spec =
- (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
- final InputRowParser inputRowParser = spec.getIngestionSpec().getDataSchema().getParser();
- LOG.error(
- "Failed to run sub tasks for inputSplits[%s]",
- getSplitsIfSplittable(
- spec.getIngestionSpec().getIOConfig().getNonNullInputSource(inputRowParser),
- spec.getIngestionSpec().getIOConfig().getNonNullInputFormat(
- inputRowParser == null ? null : inputRowParser.getParseSpec()
- ),
- tuningConfig.getSplitHintSpec()
- )
- );
+ final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
+ LOG.error("Failed to run sub tasks for inputSplits[%s]", spec.getInputSplit());
}
break;
default:
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e2106f8..2d2ec3a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -33,7 +33,6 @@
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -1001,10 +1000,7 @@
static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema)
{
- final InputRowParser parser = ingestionSchema.getDataSchema().getParser();
- return ingestionSchema.getIOConfig().getNonNullInputFormat(
- parser == null ? null : parser.getParseSpec()
- );
+ return ingestionSchema.getIOConfig().getNonNullInputFormat();
}
/**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index ae2cd88..cf6a7f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -23,7 +23,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
@@ -127,8 +126,8 @@
}
@Nullable
- public InputFormat getInputFormat(ParseSpec parseSpec)
+ public InputFormat getInputFormat()
{
- return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat;
+ return inputFormat;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index bcdcecc..d8e89aa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -255,7 +255,7 @@
.map(AggregatorFactory::getName)
.collect(Collectors.toList())
);
- this.inputFormat = ioConfig.getInputFormat(parser == null ? null : parser.getParseSpec());
+ this.inputFormat = ioConfig.getInputFormat();
this.parser = parser;
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 3a9a92f..5c04477 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -98,7 +98,7 @@
ioConfig.isUseEarliestSequenceNumber()
);
inputFormat = Preconditions.checkNotNull(
- ioConfig.getInputFormat(null),
+ ioConfig.getInputFormat(),
"[spec.ioConfig.inputFormat] is required"
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 8693d68..723e22e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,7 +23,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -102,22 +101,12 @@
}
@Nullable
- @JsonProperty("inputFormat")
- private InputFormat getGivenInputFormat()
+ @JsonProperty()
+ public InputFormat getInputFormat()
{
return inputFormat;
}
- @Nullable
- public InputFormat getInputFormat(@Nullable ParseSpec parseSpec)
- {
- if (inputFormat == null) {
- return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
- } else {
- return inputFormat;
- }
- }
-
@JsonProperty
public Integer getReplicas()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 8ffa477..8c3c5ff 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -176,6 +176,7 @@
getObjectMapper(),
tmpDir,
CompactionTaskRunTest.DEFAULT_PARSE_SPEC,
+ null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 80ea86e8..cf2bb2a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -381,6 +381,7 @@
getObjectMapper(),
tmpDir,
DEFAULT_PARSE_SPEC,
+ null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
@@ -791,6 +792,7 @@
getObjectMapper(),
tmpDir,
DEFAULT_PARSE_SPEC,
+ null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 84fff31..071862e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -22,15 +22,18 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.ParseSpec;
@@ -134,7 +137,13 @@
false,
0
);
- private static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat();
+ private static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList("ts", "dim", "val"),
+ null,
+ null,
+ false,
+ 0
+ );
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
@@ -205,13 +214,11 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false
),
@@ -252,53 +259,65 @@
writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n");
}
+ final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(
+ Arrays.asList(
+ "ts",
+ "dim",
+ "dim_array",
+ "dim_num_array",
+ "dimt",
+ "dimtarray1",
+ "dimtarray2",
+ "dimtnum_array"
+ )
+ )
+ );
+ final List<String> columns = Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val");
+ final String listDelimiter = "|";
+ final TransformSpec transformSpec = new TransformSpec(
+ new SelectorDimFilter("dim", "b", null),
+ ImmutableList.of(
+ new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()),
+ new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()),
+ new ExpressionTransform(
+ "dimtarray2",
+ "map(d -> concat(d, 'foo'), dim_array)",
+ ExprMacroTable.nil()
+ ),
+ new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil())
+ )
+ );
+ final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, false);
+ final IndexIngestionSpec indexIngestionSpec;
+ if (useInputFormatApi) {
+ indexIngestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ DEFAULT_TIMESTAMP_SPEC,
+ dimensionsSpec,
+ new CsvInputFormat(columns, listDelimiter, null, false, 0),
+ transformSpec,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ indexIngestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, dimensionsSpec, listDelimiter, columns, false, 0),
+ transformSpec,
+ null,
+ tuningConfig,
+ false
+ );
+ }
+
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec(
- "ts",
- "auto",
- null
- ),
- new DimensionsSpec(
- DimensionsSpec.getDefaultSchemas(
- Arrays.asList(
- "ts",
- "dim",
- "dim_array",
- "dim_num_array",
- "dimt",
- "dimtarray1",
- "dimtarray2",
- "dimtnum_array"
- )
- ),
- new ArrayList<>(),
- new ArrayList<>()
- ),
- "|",
- Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"),
- false,
- 0
- ),
- new TransformSpec(
- new SelectorDimFilter("dim", "b", null),
- ImmutableList.of(
- new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()),
- new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()),
- new ExpressionTransform("dimtarray2", "map(d -> concat(d, 'foo'), dim_array)", ExprMacroTable.nil()),
- new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil())
- )
- ),
- null,
- createTuningConfigWithMaxRowsPerSegment(2, false),
- false
- ),
+ indexIngestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -329,13 +348,22 @@
final List<Map<String, Object>> transforms = cursorSequence
.map(cursor -> {
final DimensionSelector selector1 = cursor.getColumnSelectorFactory()
- .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt"));
+ .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt"));
final DimensionSelector selector2 = cursor.getColumnSelectorFactory()
- .makeDimensionSelector(new DefaultDimensionSpec("dimtarray1", "dimtarray1"));
+ .makeDimensionSelector(new DefaultDimensionSpec(
+ "dimtarray1",
+ "dimtarray1"
+ ));
final DimensionSelector selector3 = cursor.getColumnSelectorFactory()
- .makeDimensionSelector(new DefaultDimensionSpec("dimtarray2", "dimtarray2"));
+ .makeDimensionSelector(new DefaultDimensionSpec(
+ "dimtarray2",
+ "dimtarray2"
+ ));
final DimensionSelector selector4 = cursor.getColumnSelectorFactory()
- .makeDimensionSelector(new DefaultDimensionSpec("dimtnum_array", "dimtnum_array"));
+ .makeDimensionSelector(new DefaultDimensionSpec(
+ "dimtnum_array",
+ "dimtnum_array"
+ ));
Map<String, Object> row = new HashMap<>();
@@ -375,15 +403,14 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new ArbitraryGranularitySpec(
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
+ null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false
),
@@ -414,16 +441,15 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.HOUR,
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
),
+ null,
createTuningConfigWithMaxRowsPerSegment(50, true),
false
),
@@ -454,13 +480,11 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
- null,
createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true),
false
),
@@ -496,13 +520,11 @@
final IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
- null,
createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true),
false
),
@@ -574,13 +596,11 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
- null,
createTuningConfigWithMaxRowsPerSegment(2, false),
true
),
@@ -624,16 +644,15 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
+ null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false
),
@@ -676,25 +695,37 @@
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true);
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(null, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
+
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- null,
- true,
- 0
- ),
- null,
- createTuningConfigWithMaxRowsPerSegment(2, true),
- false
- ),
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -723,25 +754,38 @@
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final List<String> columns = Arrays.asList("time", "dim", "val");
+ final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true);
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
+
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- Arrays.asList("time", "dim", "val"),
- true,
- 0
- ),
- null,
- createTuningConfigWithMaxRowsPerSegment(2, true),
- false
- ),
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -779,16 +823,15 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
+ null,
createTuningConfig(2, 2, null, 2L, null, false, true),
false
),
@@ -826,17 +869,16 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
+ null,
createTuningConfig(3, 2, null, 2L, null, true, true),
false
),
@@ -873,17 +915,16 @@
IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
+ null,
createTuningConfig(3, 2, null, 2L, null, false, true),
false
),
@@ -937,24 +978,37 @@
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final List<String> columns = Arrays.asList("time", "dim", "val");
+ // ignore parse exception
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, false);
+
// GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in
// IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments()
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- Arrays.asList("time", "dim", "val"),
- true,
- 0
- ),
- null,
- createTuningConfig(2, null, null, null, null, false, false), // ignore parse exception,
- false
- );
+ final IndexIngestionSpec parseExceptionIgnoreSpec;
+ if (useInputFormatApi) {
+ parseExceptionIgnoreSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ parseExceptionIgnoreSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
@@ -987,27 +1041,39 @@
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- Arrays.asList("time", "dim", "val"),
- true,
- 0
- ),
- null,
- createTuningConfig(2, null, null, null, null, false, true), // report parse exception
- false
- );
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final List<String> columns = Arrays.asList("time", "dim", "val");
+ // report parse exception
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
+ final IndexIngestionSpec indexIngestionSpec;
+ if (useInputFormatApi) {
+ indexIngestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ indexIngestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ indexIngestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1075,31 +1141,43 @@
7
);
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new JSONParseSpec(
- new TimestampSpec("time", "auto", null),
- new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("dim"),
- new LongDimensionSchema("dimLong"),
- new FloatDimensionSchema("dimFloat")
- )
- ),
- null,
- null
- ),
- null,
- tuningConfig,
- false
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
);
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ dimensionsSpec,
+ new JsonInputFormat(null, null),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new JSONParseSpec(timestampSpec, dimensionsSpec, null, null),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1195,33 +1273,44 @@
5
);
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("dim"),
- new LongDimensionSchema("dimLong"),
- new FloatDimensionSchema("dimFloat")
- )
- ),
- null,
- Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"),
- true,
- 0
- ),
- null,
- tuningConfig,
- false
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
);
+ final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ dimensionsSpec,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1308,33 +1397,44 @@
5
);
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("dim"),
- new LongDimensionSchema("dimLong"),
- new FloatDimensionSchema("dimFloat")
- )
- ),
- null,
- Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"),
- true,
- 0
- ),
- null,
- tuningConfig,
- false
+ final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+ final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
);
+ final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ timestampSpec,
+ dimensionsSpec,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1389,45 +1489,55 @@
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
- writer.write("time,,\n");
+ writer.write("ts,,\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
- writer.write("time,dim,\n");
+ writer.write("ts,dim,\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
- writer.write("time,,val\n");
+ writer.write("ts,,val\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- null,
- true,
- 0
- ),
- null,
- createTuningConfig(2, 1, null, null, null, true, true), // report parse exception
- false
- );
+ // report parse exception
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, null, true, true);
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ DEFAULT_TIMESTAMP_SPEC,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(null, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1472,27 +1582,38 @@
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
- final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
- useInputFormatApi,
- jsonMapper,
- tmpDir,
- new CSVParseSpec(
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.EMPTY,
- null,
- Arrays.asList("time", "", ""),
- true,
- 0
- ),
- null,
- createTuningConfig(2, null, null, null, null, false, true), // report parse exception
- false
- );
+ final List<String> columns = Arrays.asList("ts", "", "");
+ // report parse exception
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
+ final IndexIngestionSpec ingestionSpec;
+ if (useInputFormatApi) {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ DEFAULT_TIMESTAMP_SPEC,
+ DimensionsSpec.EMPTY,
+ new CsvInputFormat(columns, null, null, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ } else {
+ ingestionSpec = createIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, columns, true, 0),
+ null,
+ null,
+ tuningConfig,
+ false
+ );
+ }
IndexTask indexTask = new IndexTask(
null,
null,
- parseExceptionIgnoreSpec,
+ ingestionSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
@@ -1529,17 +1650,16 @@
final IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
+ null,
createTuningConfig(3, 2, null, 2L, null, false, true),
false
),
@@ -1598,17 +1718,16 @@
final IndexTask indexTask = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
- null,
new UniformGranularitySpec(
segmentGranularity,
Granularities.DAY,
true,
null
),
+ null,
createTuningConfig(3, 2, null, 2L, null, false, true),
false
),
@@ -1643,13 +1762,11 @@
final IndexTask task = new IndexTask(
null,
null,
- createIngestionSpec(
- useInputFormatApi,
+ createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
- null,
createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true),
false
),
@@ -1762,20 +1879,58 @@
);
}
+ private IndexIngestionSpec createDefaultIngestionSpec(
+ ObjectMapper objectMapper,
+ File baseDir,
+ @Nullable GranularitySpec granularitySpec,
+ @Nullable TransformSpec transformSpec,
+ IndexTuningConfig tuningConfig,
+ boolean appendToExisting
+ )
+ {
+ if (useInputFormatApi) {
+ return createIngestionSpec(
+ objectMapper,
+ baseDir,
+ DEFAULT_TIMESTAMP_SPEC,
+ DEFAULT_DIMENSIONS_SPEC,
+ DEFAULT_INPUT_FORMAT,
+ transformSpec,
+ granularitySpec,
+ tuningConfig,
+ appendToExisting
+ );
+ } else {
+ return createIngestionSpec(
+ objectMapper,
+ baseDir,
+ DEFAULT_PARSE_SPEC,
+ transformSpec,
+ granularitySpec,
+ tuningConfig,
+ appendToExisting
+ );
+ }
+ }
+
static IndexIngestionSpec createIngestionSpec(
ObjectMapper objectMapper,
File baseDir,
@Nullable ParseSpec parseSpec,
- GranularitySpec granularitySpec,
+ @Nullable TransformSpec transformSpec,
+ @Nullable GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return createIngestionSpec(
- false,
objectMapper,
baseDir,
parseSpec,
+ null,
+ null,
+ null,
+ transformSpec,
granularitySpec,
tuningConfig,
appendToExisting
@@ -1783,21 +1938,25 @@
}
static IndexIngestionSpec createIngestionSpec(
- boolean useInputFormatApi,
ObjectMapper objectMapper,
File baseDir,
- @Nullable ParseSpec parseSpec,
- GranularitySpec granularitySpec,
+ TimestampSpec timestampSpec,
+ DimensionsSpec dimensionsSpec,
+ InputFormat inputFormat,
+ @Nullable TransformSpec transformSpec,
+ @Nullable GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return createIngestionSpec(
- useInputFormatApi,
objectMapper,
baseDir,
- parseSpec,
- TransformSpec.NONE,
+ null,
+ timestampSpec,
+ dimensionsSpec,
+ inputFormat,
+ transformSpec,
granularitySpec,
tuningConfig,
appendToExisting
@@ -1805,22 +1964,25 @@
}
private static IndexIngestionSpec createIngestionSpec(
- boolean useInputFormatApi,
ObjectMapper objectMapper,
File baseDir,
@Nullable ParseSpec parseSpec,
- TransformSpec transformSpec,
- GranularitySpec granularitySpec,
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ @Nullable InputFormat inputFormat,
+ @Nullable TransformSpec transformSpec,
+ @Nullable GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
- if (useInputFormatApi) {
+ if (inputFormat != null) {
+ Preconditions.checkArgument(parseSpec == null, "Can't use parseSpec");
return new IndexIngestionSpec(
new DataSchema(
"test",
- parseSpec == null ? DEFAULT_TIMESTAMP_SPEC : parseSpec.getTimestampSpec(),
- parseSpec == null ? DEFAULT_DIMENSIONS_SPEC : parseSpec.getDimensionsSpec(),
+ Preconditions.checkNotNull(timestampSpec, "timestampSpec"),
+ Preconditions.checkNotNull(dimensionsSpec, "dimensionsSpec"),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
@@ -1834,7 +1996,7 @@
new IndexIOConfig(
null,
new LocalInputSource(baseDir, "druid*"),
- parseSpec == null ? DEFAULT_INPUT_FORMAT : parseSpec.toInputFormat(),
+ inputFormat,
appendToExisting
),
tuningConfig
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 194a556..7970e1d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -19,10 +19,14 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
@@ -55,6 +59,7 @@
import org.joda.time.Interval;
import org.junit.Assert;
+import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
@@ -82,8 +87,16 @@
this.useInputFormatApi = useInputFormatApi;
}
+ boolean isUseInputFormatApi()
+ {
+ return useInputFormatApi;
+ }
+
Set<DataSegment> runTestTask(
- ParseSpec parseSpec,
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ @Nullable InputFormat inputFormat,
+ @Nullable ParseSpec parseSpec,
Interval interval,
File inputDir,
String filter,
@@ -93,6 +106,9 @@
)
{
final ParallelIndexSupervisorTask task = newTask(
+ timestampSpec,
+ dimensionsSpec,
+ inputFormat,
parseSpec,
interval,
inputDir,
@@ -108,7 +124,10 @@
}
private ParallelIndexSupervisorTask newTask(
- ParseSpec parseSpec,
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ @Nullable InputFormat inputFormat,
+ @Nullable ParseSpec parseSpec,
Interval interval,
File inputDir,
String filter,
@@ -154,17 +173,18 @@
final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
+ Preconditions.checkArgument(parseSpec == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, filter),
- parseSpec.toInputFormat(),
+ inputFormat,
false
);
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
- parseSpec.getTimestampSpec(),
- parseSpec.getDimensionsSpec(),
+ timestampSpec,
+ dimensionsSpec,
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
@@ -175,6 +195,7 @@
tuningConfig
);
} else {
+ Preconditions.checkArgument(inputFormat == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, filter, null),
false
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 4542ac0..ca33815 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -38,6 +38,7 @@
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -131,7 +132,13 @@
false,
0
);
- static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat();
+ static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList("ts", "dim", "val"),
+ null,
+ false,
+ false,
+ 0
+ );
static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig(
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index c1165dd..bcd3cbe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -20,7 +20,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -54,18 +56,25 @@
@RunWith(Parameterized.class)
public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
{
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+ private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+ );
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
- new TimestampSpec(
- "ts",
- "auto",
- null
- ),
- new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))),
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
null,
Arrays.asList("ts", "dim1", "dim2", "val"),
false,
0
);
+ private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList("ts", "dim1", "dim2", "val"),
+ null,
+ false,
+ false,
+ 0
+ );
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@@ -112,15 +121,34 @@
@Test
public void testRun() throws Exception
{
- final Set<DataSegment> publishedSegments = runTestTask(
- PARSE_SPEC,
- INTERVAL_TO_INDEX,
- inputDir,
- "test_*",
- new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
- MAX_NUM_CONCURRENT_SUB_TASKS,
- TaskState.SUCCESS
- );
+ final Set<DataSegment> publishedSegments;
+ if (isUseInputFormatApi()) {
+ publishedSegments = runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+ MAX_NUM_CONCURRENT_SUB_TASKS,
+ TaskState.SUCCESS
+ );
+ } else {
+ publishedSegments = runTestTask(
+ null,
+ null,
+ null,
+ PARSE_SPEC,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+ MAX_NUM_CONCURRENT_SUB_TASKS,
+ TaskState.SUCCESS
+ );
+ }
assertHashedPartition(publishedSegments);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index da2f7a9..f62799b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -25,7 +25,9 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import org.apache.druid.common.config.NullValueHandlingConfig;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -79,18 +81,25 @@
private static final String LIST_DELIMITER = "|";
private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
private static final String TEST_FILE_NAME_PREFIX = "test_";
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
+ private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
+ );
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
- new TimestampSpec(
- TIME,
- "auto",
- null
- ),
- new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))),
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
LIST_DELIMITER,
Arrays.asList(TIME, DIM1, DIM2, "val"),
false,
0
);
+ private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList(TIME, DIM1, DIM2, "val"),
+ LIST_DELIMITER,
+ false,
+ false,
+ 0
+ );
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
public static Iterable<Object[]> constructorFeeder()
@@ -192,20 +201,44 @@
public void createsCorrectRangePartitions() throws Exception
{
int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
- final Set<DataSegment> publishedSegments = runTestTask(
- PARSE_SPEC,
- INTERVAL_TO_INDEX,
- inputDir,
- TEST_FILE_NAME_PREFIX + "*",
- new SingleDimensionPartitionsSpec(
- targetRowsPerSegment,
- null,
- DIM1,
- false
- ),
- maxNumConcurrentSubTasks,
- useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
- );
+ final Set<DataSegment> publishedSegments;
+ if (isUseInputFormatApi()) {
+ publishedSegments = runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ TEST_FILE_NAME_PREFIX + "*",
+ new SingleDimensionPartitionsSpec(
+ targetRowsPerSegment,
+ null,
+ DIM1,
+ false
+ ),
+ maxNumConcurrentSubTasks,
+ useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
+ );
+ } else {
+ publishedSegments = runTestTask(
+ null,
+ null,
+ null,
+ PARSE_SPEC,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ TEST_FILE_NAME_PREFIX + "*",
+ new SingleDimensionPartitionsSpec(
+ targetRowsPerSegment,
+ null,
+ DIM1,
+ false
+ ),
+ maxNumConcurrentSubTasks,
+ useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
+ );
+ }
if (!useMultivalueDim) {
assertRangePartitions(publishedSegments);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index d147691..03a1193 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -19,8 +19,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -87,7 +85,6 @@
import org.joda.time.Interval;
import org.junit.Assert;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -149,12 +146,12 @@
static {
OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
- OBJECT_MAPPER.registerSubtypes(new NamedType(UnimplementedInputFormatJsonParseSpec.class, "json"));
+ OBJECT_MAPPER.registerSubtypes(new NamedType(JSONParseSpec.class, "json"));
OLD_DATA_SCHEMA = new DataSchema(
"test_ds",
OBJECT_MAPPER.convertValue(
new StringInputRowParser(
- new UnimplementedInputFormatJsonParseSpec(
+ new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
@@ -443,25 +440,4 @@
return segmentDescriptor;
}
}
-
- private static class UnimplementedInputFormatJsonParseSpec extends JSONParseSpec
- {
- @JsonCreator
- private UnimplementedInputFormatJsonParseSpec(
- @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
- @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
- @JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
- @JsonProperty("featureSpec") Map<String, Boolean> featureSpec
- )
- {
- super(timestampSpec, dimensionsSpec, flattenSpec, featureSpec);
- }
-
- @Nullable
- @Override
- public InputFormat toInputFormat()
- {
- return null;
- }
- }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 76a02db..c1814d6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -22,7 +22,6 @@
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
-import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -63,7 +62,12 @@
public void testWithParserAndNullInputformatParseProperly() throws IOException
{
final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
- new NotConvertibleToInputFormatParseSpec(),
+ new JSONParseSpec(
+ TIMESTAMP_SPEC,
+ DimensionsSpec.EMPTY,
+ JSONPathSpec.DEFAULT,
+ Collections.emptyMap()
+ ),
StringUtils.UTF8_STRING
);
final StreamChunkParser chunkParser = new StreamChunkParser(
@@ -109,7 +113,12 @@
public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException
{
final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
- new NotConvertibleToInputFormatParseSpec(),
+ new JSONParseSpec(
+ TIMESTAMP_SPEC,
+ DimensionsSpec.EMPTY,
+ JSONPathSpec.DEFAULT,
+ Collections.emptyMap()
+ ),
StringUtils.UTF8_STRING
);
final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat(
@@ -138,25 +147,6 @@
Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
}
- private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec
- {
- private NotConvertibleToInputFormatParseSpec()
- {
- super(
- TIMESTAMP_SPEC,
- DimensionsSpec.EMPTY,
- JSONPathSpec.DEFAULT,
- Collections.emptyMap()
- );
- }
-
- @Override
- public InputFormat toInputFormat()
- {
- return null;
- }
- }
-
private static class TrackingJsonInputFormat extends JsonInputFormat
{
private boolean used;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d34bbbb..73bbaca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1027,9 +1027,7 @@
true,
minimumMessageTime,
maximumMessageTime,
- ioConfig.getInputFormat(
- getDataSchema().getParser() == null ? null : getDataSchema().getParser().getParseSpec()
- )
+ ioConfig.getInputFormat()
)
{
};