Remove ParseSpec.toInputFormat() (#9815)

* Remove toInputFormat() from ParseSpec

* fix test
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
index 51ffd38..81c8a26 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.java.util.common.parsers.CSVParser;
 import org.apache.druid.java.util.common.parsers.Parser;
 
@@ -98,12 +97,6 @@
   }
 
   @Override
-  public InputFormat toInputFormat()
-  {
-    return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
-  }
-
-  @Override
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
     return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows);
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
index 3ee0f71..5940e70 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.java.util.common.parsers.DelimitedParser;
 import org.apache.druid.java.util.common.parsers.Parser;
 
@@ -125,12 +124,6 @@
   }
 
   @Override
-  public InputFormat toInputFormat()
-  {
-    return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows);
-  }
-
-  @Override
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
     return new DelimitedParseSpec(
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
index 870076d..3a7136b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
@@ -23,7 +23,6 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.java.util.common.parsers.JSONPathParser;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.common.parsers.Parser;
@@ -69,12 +68,6 @@
   }
 
   @Override
-  public InputFormat toInputFormat()
-  {
-    return new JsonInputFormat(getFlattenSpec(), getFeatureSpec());
-  }
-
-  @Override
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
     return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getFeatureSpec());
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
index adc5299..33e9f44 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java
@@ -23,13 +23,10 @@
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.guice.annotations.ExtensionPoint;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.java.util.common.parsers.Parser;
 
-import javax.annotation.Nullable;
-
 @Deprecated
 @ExtensionPoint
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format")
@@ -71,16 +68,6 @@
     return null;
   }
 
-  /**
-   * Returns null if it's not implemented yet.
-   * This method (and maybe this class) will be removed in favor of {@link InputFormat} in the future.
-   */
-  @Nullable
-  public InputFormat toInputFormat()
-  {
-    return null;
-  }
-
   @PublicApi
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
index c163de1..088bed5 100644
--- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
@@ -52,7 +52,7 @@
     }
     final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines);
     final StringInputRowParser inputRowParser = new StringInputRowParser(
-        new UnimplementedInputFormatCsvParseSpec(
+        new CSVParseSpec(
             new TimestampSpec(null, "yyyyMMdd", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))),
             ",",
@@ -95,28 +95,6 @@
     }
   }
 
-  private static class UnimplementedInputFormatCsvParseSpec extends CSVParseSpec
-  {
-    private UnimplementedInputFormatCsvParseSpec(
-        TimestampSpec timestampSpec,
-        DimensionsSpec dimensionsSpec,
-        String listDelimiter,
-        List<String> columns,
-        boolean hasHeaderRow,
-        int skipHeaderRows
-    )
-    {
-      super(timestampSpec, dimensionsSpec, listDelimiter, columns, hasHeaderRow, skipHeaderRows);
-    }
-
-    @Nullable
-    @Override
-    public InputFormat toInputFormat()
-    {
-      return null;
-    }
-  }
-
   private static class TestFirehoseFactory implements FiniteFirehoseFactory<StringInputRowParser, Object>
   {
     private final List<String> lines;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ba12128..6c88c5a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -194,9 +194,7 @@
         true,
         minimumMessageTime,
         maximumMessageTime,
-        ioConfig.getInputFormat(
-            spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
-        )
+        ioConfig.getInputFormat()
     );
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index c789fc7..0000ee6 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -140,9 +140,7 @@
         true,
         minimumMessageTime,
         maximumMessageTime,
-        ioConfig.getInputFormat(
-            spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
-        ),
+        ioConfig.getInputFormat(),
         ioConfig.getEndpoint(),
         ioConfig.getRecordsPerFetch(),
         ioConfig.getFetchDelayMillis(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index be24995..712340f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -44,7 +44,6 @@
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.Rows;
 import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.indexer.Checks;
 import org.apache.druid.indexer.IngestionState;
@@ -1046,10 +1045,7 @@
 
   private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
   {
-    final InputRowParser parser = ingestionSchema.getDataSchema().getParser();
-    return ingestionSchema.getIOConfig().getNonNullInputFormat(
-        parser == null ? null : parser.getParseSpec()
-    );
+    return ingestionSchema.getIOConfig().getNonNullInputFormat();
   }
 
   public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
@@ -1184,13 +1180,9 @@
       }
     }
 
-    public InputFormat getNonNullInputFormat(@Nullable ParseSpec parseSpec)
+    public InputFormat getNonNullInputFormat()
     {
-      if (inputFormat == null) {
-        return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
-      } else {
-        return inputFormat;
-      }
+      return Preconditions.checkNotNull(inputFormat, "inputFormat");
     }
 
     @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index cfbc0ca..57c117d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -30,7 +30,6 @@
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
-import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
@@ -196,19 +195,8 @@
               if (lastStatus != null) {
                 LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId());
               } else {
-                final SinglePhaseSubTaskSpec spec =
-                    (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
-                final InputRowParser inputRowParser = spec.getIngestionSpec().getDataSchema().getParser();
-                LOG.error(
-                    "Failed to run sub tasks for inputSplits[%s]",
-                    getSplitsIfSplittable(
-                        spec.getIngestionSpec().getIOConfig().getNonNullInputSource(inputRowParser),
-                        spec.getIngestionSpec().getIOConfig().getNonNullInputFormat(
-                            inputRowParser == null ? null : inputRowParser.getParseSpec()
-                        ),
-                        tuningConfig.getSplitHintSpec()
-                    )
-                );
+                final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
+                LOG.error("Failed to run sub tasks for inputSplits[%s]", spec.getInputSplit());
               }
               break;
             default:
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e2106f8..2d2ec3a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -33,7 +33,6 @@
 import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -1001,10 +1000,7 @@
 
   static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema)
   {
-    final InputRowParser parser = ingestionSchema.getDataSchema().getParser();
-    return ingestionSchema.getIOConfig().getNonNullInputFormat(
-        parser == null ? null : parser.getParseSpec()
-    );
+    return ingestionSchema.getIOConfig().getNonNullInputFormat();
   }
 
   /**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index ae2cd88..cf6a7f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -23,7 +23,6 @@
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.segment.indexing.IOConfig;
 import org.joda.time.DateTime;
 
@@ -127,8 +126,8 @@
   }
 
   @Nullable
-  public InputFormat getInputFormat(ParseSpec parseSpec)
+  public InputFormat getInputFormat()
   {
-    return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat;
+    return inputFormat;
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index bcdcecc..d8e89aa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -255,7 +255,7 @@
               .map(AggregatorFactory::getName)
               .collect(Collectors.toList())
     );
-    this.inputFormat = ioConfig.getInputFormat(parser == null ? null : parser.getParseSpec());
+    this.inputFormat = ioConfig.getInputFormat();
     this.parser = parser;
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 3a9a92f..5c04477 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -98,7 +98,7 @@
           ioConfig.isUseEarliestSequenceNumber()
       );
       inputFormat = Preconditions.checkNotNull(
-          ioConfig.getInputFormat(null),
+          ioConfig.getInputFormat(),
           "[spec.ioConfig.inputFormat] is required"
       );
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 8693d68..723e22e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,7 +23,6 @@
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.IAE;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -102,22 +101,12 @@
   }
 
   @Nullable
-  @JsonProperty("inputFormat")
-  private InputFormat getGivenInputFormat()
+  @JsonProperty()
+  public InputFormat getInputFormat()
   {
     return inputFormat;
   }
 
-  @Nullable
-  public InputFormat getInputFormat(@Nullable ParseSpec parseSpec)
-  {
-    if (inputFormat == null) {
-      return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
-    } else {
-      return inputFormat;
-    }
-  }
-
   @JsonProperty
   public Integer getReplicas()
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 8ffa477..8c3c5ff 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -176,6 +176,7 @@
             getObjectMapper(),
             tmpDir,
             CompactionTaskRunTest.DEFAULT_PARSE_SPEC,
+            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 80ea86e8..cf2bb2a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -381,6 +381,7 @@
             getObjectMapper(),
             tmpDir,
             DEFAULT_PARSE_SPEC,
+            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
@@ -791,6 +792,7 @@
             getObjectMapper(),
             tmpDir,
             DEFAULT_PARSE_SPEC,
+            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 84fff31..071862e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -22,15 +22,18 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.ParseSpec;
@@ -134,7 +137,13 @@
       false,
       0
   );
-  private static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat();
+  private static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList("ts", "dim", "val"),
+      null,
+      null,
+      false,
+      0
+  );
 
   @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
   public static Iterable<Object[]> constructorFeeder()
@@ -205,13 +214,11 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
             null,
             null,
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
@@ -252,53 +259,65 @@
       writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n");
     }
 
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        DimensionsSpec.getDefaultSchemas(
+            Arrays.asList(
+                "ts",
+                "dim",
+                "dim_array",
+                "dim_num_array",
+                "dimt",
+                "dimtarray1",
+                "dimtarray2",
+                "dimtnum_array"
+            )
+        )
+    );
+    final List<String> columns = Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val");
+    final String listDelimiter = "|";
+    final TransformSpec transformSpec = new TransformSpec(
+        new SelectorDimFilter("dim", "b", null),
+        ImmutableList.of(
+            new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()),
+            new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()),
+            new ExpressionTransform(
+                "dimtarray2",
+                "map(d -> concat(d, 'foo'), dim_array)",
+                ExprMacroTable.nil()
+            ),
+            new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil())
+        )
+    );
+    final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, false);
+    final IndexIngestionSpec indexIngestionSpec;
+    if (useInputFormatApi) {
+      indexIngestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          DEFAULT_TIMESTAMP_SPEC,
+          dimensionsSpec,
+          new CsvInputFormat(columns, listDelimiter, null, false, 0),
+          transformSpec,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      indexIngestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, dimensionsSpec, listDelimiter, columns, false, 0),
+          transformSpec,
+          null,
+          tuningConfig,
+          false
+      );
+    }
+
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
-            jsonMapper,
-            tmpDir,
-            new CSVParseSpec(
-                new TimestampSpec(
-                    "ts",
-                    "auto",
-                    null
-                ),
-                new DimensionsSpec(
-                    DimensionsSpec.getDefaultSchemas(
-                        Arrays.asList(
-                            "ts",
-                            "dim",
-                            "dim_array",
-                            "dim_num_array",
-                            "dimt",
-                            "dimtarray1",
-                            "dimtarray2",
-                            "dimtnum_array"
-                        )
-                    ),
-                    new ArrayList<>(),
-                    new ArrayList<>()
-                ),
-                "|",
-                Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"),
-                false,
-                0
-            ),
-            new TransformSpec(
-                new SelectorDimFilter("dim", "b", null),
-                ImmutableList.of(
-                    new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()),
-                    new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()),
-                    new ExpressionTransform("dimtarray2", "map(d -> concat(d, 'foo'), dim_array)", ExprMacroTable.nil()),
-                    new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil())
-                )
-            ),
-            null,
-            createTuningConfigWithMaxRowsPerSegment(2, false),
-            false
-        ),
+        indexIngestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -329,13 +348,22 @@
     final List<Map<String, Object>> transforms = cursorSequence
         .map(cursor -> {
           final DimensionSelector selector1 = cursor.getColumnSelectorFactory()
-                                                   .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt"));
+                                                    .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt"));
           final DimensionSelector selector2 = cursor.getColumnSelectorFactory()
-                                                    .makeDimensionSelector(new DefaultDimensionSpec("dimtarray1", "dimtarray1"));
+                                                    .makeDimensionSelector(new DefaultDimensionSpec(
+                                                        "dimtarray1",
+                                                        "dimtarray1"
+                                                    ));
           final DimensionSelector selector3 = cursor.getColumnSelectorFactory()
-                                                   .makeDimensionSelector(new DefaultDimensionSpec("dimtarray2", "dimtarray2"));
+                                                    .makeDimensionSelector(new DefaultDimensionSpec(
+                                                        "dimtarray2",
+                                                        "dimtarray2"
+                                                    ));
           final DimensionSelector selector4 = cursor.getColumnSelectorFactory()
-                                                    .makeDimensionSelector(new DefaultDimensionSpec("dimtnum_array", "dimtnum_array"));
+                                                    .makeDimensionSelector(new DefaultDimensionSpec(
+                                                        "dimtnum_array",
+                                                        "dimtnum_array"
+                                                    ));
 
 
           Map<String, Object> row = new HashMap<>();
@@ -375,15 +403,14 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new ArbitraryGranularitySpec(
                 Granularities.MINUTE,
                 Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
+            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false
         ),
@@ -414,16 +441,15 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.HOUR,
                 Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
             ),
+            null,
             createTuningConfigWithMaxRowsPerSegment(50, true),
             false
         ),
@@ -454,13 +480,11 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
             null,
             null,
-            null,
             createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true),
             false
         ),
@@ -496,13 +520,11 @@
     final IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
             null,
             null,
-            null,
             createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true),
             false
         ),
@@ -574,13 +596,11 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
             null,
             null,
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, false),
             true
         ),
@@ -624,16 +644,15 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
+            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
@@ -676,25 +695,37 @@
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true);
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(null, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
+
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
-            jsonMapper,
-            tmpDir,
-            new CSVParseSpec(
-                new TimestampSpec("time", "auto", null),
-                DimensionsSpec.EMPTY,
-                null,
-                null,
-                true,
-                0
-            ),
-            null,
-            createTuningConfigWithMaxRowsPerSegment(2, true),
-            false
-        ),
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -723,25 +754,38 @@
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final List<String> columns = Arrays.asList("time", "dim", "val");
+    final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true);
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
+
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
-            jsonMapper,
-            tmpDir,
-            new CSVParseSpec(
-                new TimestampSpec("time", "auto", null),
-                DimensionsSpec.EMPTY,
-                null,
-                Arrays.asList("time", "dim", "val"),
-                true,
-                0
-            ),
-            null,
-            createTuningConfigWithMaxRowsPerSegment(2, true),
-            false
-        ),
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -779,16 +823,15 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
+            null,
             createTuningConfig(2, 2, null, 2L, null, false, true),
             false
         ),
@@ -826,17 +869,16 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.DAY,
                 true,
                 null
             ),
+            null,
             createTuningConfig(3, 2, null, 2L, null, true, true),
             false
         ),
@@ -873,17 +915,16 @@
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
-            null,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.DAY,
                 true,
                 null
             ),
+            null,
             createTuningConfig(3, 2, null, 2L, null, false, true),
             false
         ),
@@ -937,24 +978,37 @@
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final List<String> columns = Arrays.asList("time", "dim", "val");
+    // ignore parse exception
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, false);
+
     // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in
     // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments()
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            DimensionsSpec.EMPTY,
-            null,
-            Arrays.asList("time", "dim", "val"),
-            true,
-            0
-        ),
-        null,
-        createTuningConfig(2, null, null, null, null, false, false), // ignore parse exception,
-        false
-    );
+    final IndexIngestionSpec parseExceptionIgnoreSpec;
+    if (useInputFormatApi) {
+      parseExceptionIgnoreSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      parseExceptionIgnoreSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
@@ -987,27 +1041,39 @@
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            DimensionsSpec.EMPTY,
-            null,
-            Arrays.asList("time", "dim", "val"),
-            true,
-            0
-        ),
-        null,
-        createTuningConfig(2, null, null, null, null, false, true), // report parse exception
-        false
-    );
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final List<String> columns = Arrays.asList("time", "dim", "val");
+    // report parse exception
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
+    final IndexIngestionSpec indexIngestionSpec;
+    if (useInputFormatApi) {
+      indexIngestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      indexIngestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        indexIngestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1075,31 +1141,43 @@
         7
     );
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new JSONParseSpec(
-            new TimestampSpec("time", "auto", null),
-            new DimensionsSpec(
-                Arrays.asList(
-                    new StringDimensionSchema("dim"),
-                    new LongDimensionSchema("dimLong"),
-                    new FloatDimensionSchema("dimFloat")
-                )
-            ),
-            null,
-            null
-        ),
-        null,
-        tuningConfig,
-        false
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
     );
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          dimensionsSpec,
+          new JsonInputFormat(null, null),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new JSONParseSpec(timestampSpec, dimensionsSpec, null, null),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1195,33 +1273,44 @@
         5
     );
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            new DimensionsSpec(
-                Arrays.asList(
-                    new StringDimensionSchema("dim"),
-                    new LongDimensionSchema("dimLong"),
-                    new FloatDimensionSchema("dimFloat")
-                )
-            ),
-            null,
-            Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"),
-            true,
-            0
-        ),
-        null,
-        tuningConfig,
-        false
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
     );
+    final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          dimensionsSpec,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1308,33 +1397,44 @@
         5
     );
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            new DimensionsSpec(
-                Arrays.asList(
-                    new StringDimensionSchema("dim"),
-                    new LongDimensionSchema("dimLong"),
-                    new FloatDimensionSchema("dimFloat")
-                )
-            ),
-            null,
-            Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"),
-            true,
-            0
-        ),
-        null,
-        tuningConfig,
-        false
+    final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
     );
+    final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          timestampSpec,
+          dimensionsSpec,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1389,45 +1489,55 @@
     File tmpFile = File.createTempFile("druid", "index", tmpDir);
 
     try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
-      writer.write("time,,\n");
+      writer.write("ts,,\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
     tmpFile = File.createTempFile("druid", "index", tmpDir);
 
     try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
-      writer.write("time,dim,\n");
+      writer.write("ts,dim,\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
     tmpFile = File.createTempFile("druid", "index", tmpDir);
 
     try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
-      writer.write("time,,val\n");
+      writer.write("ts,,val\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            DimensionsSpec.EMPTY,
-            null,
-            null,
-            true,
-            0
-        ),
-        null,
-        createTuningConfig(2, 1, null, null, null, true, true), // report parse exception
-        false
-    );
+    // report parse exception
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, null, true, true);
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          DEFAULT_TIMESTAMP_SPEC,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(null, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1472,27 +1582,38 @@
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
-    final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
-        useInputFormatApi,
-        jsonMapper,
-        tmpDir,
-        new CSVParseSpec(
-            new TimestampSpec("time", "auto", null),
-            DimensionsSpec.EMPTY,
-            null,
-            Arrays.asList("time", "", ""),
-            true,
-            0
-        ),
-        null,
-        createTuningConfig(2, null, null, null, null, false, true), // report parse exception
-        false
-    );
+    final List<String> columns = Arrays.asList("ts", "", "");
+    // report parse exception
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
+    final IndexIngestionSpec ingestionSpec;
+    if (useInputFormatApi) {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          DEFAULT_TIMESTAMP_SPEC,
+          DimensionsSpec.EMPTY,
+          new CsvInputFormat(columns, null, null, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    } else {
+      ingestionSpec = createIngestionSpec(
+          jsonMapper,
+          tmpDir,
+          new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, columns, true, 0),
+          null,
+          null,
+          tuningConfig,
+          false
+      );
+    }
 
     IndexTask indexTask = new IndexTask(
         null,
         null,
-        parseExceptionIgnoreSpec,
+        ingestionSpec,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
@@ -1529,17 +1650,16 @@
       final IndexTask indexTask = new IndexTask(
           null,
           null,
-          createIngestionSpec(
-              useInputFormatApi,
+          createDefaultIngestionSpec(
               jsonMapper,
               tmpDir,
-              null,
               new UniformGranularitySpec(
                   Granularities.DAY,
                   Granularities.DAY,
                   true,
                   null
               ),
+              null,
               createTuningConfig(3, 2, null, 2L, null, false, true),
               false
           ),
@@ -1598,17 +1718,16 @@
       final IndexTask indexTask = new IndexTask(
           null,
           null,
-          createIngestionSpec(
-              useInputFormatApi,
+          createDefaultIngestionSpec(
               jsonMapper,
               tmpDir,
-              null,
               new UniformGranularitySpec(
                   segmentGranularity,
                   Granularities.DAY,
                   true,
                   null
               ),
+              null,
               createTuningConfig(3, 2, null, 2L, null, false, true),
               false
           ),
@@ -1643,13 +1762,11 @@
     final IndexTask task = new IndexTask(
         null,
         null,
-        createIngestionSpec(
-            useInputFormatApi,
+        createDefaultIngestionSpec(
             jsonMapper,
             tmpDir,
             null,
             null,
-            null,
             createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true),
             false
         ),
@@ -1762,20 +1879,58 @@
     );
   }
 
+  private IndexIngestionSpec createDefaultIngestionSpec(
+      ObjectMapper objectMapper,
+      File baseDir,
+      @Nullable GranularitySpec granularitySpec,
+      @Nullable TransformSpec transformSpec,
+      IndexTuningConfig tuningConfig,
+      boolean appendToExisting
+  )
+  {
+    if (useInputFormatApi) {
+      return createIngestionSpec(
+          objectMapper,
+          baseDir,
+          DEFAULT_TIMESTAMP_SPEC,
+          DEFAULT_DIMENSIONS_SPEC,
+          DEFAULT_INPUT_FORMAT,
+          transformSpec,
+          granularitySpec,
+          tuningConfig,
+          appendToExisting
+      );
+    } else {
+      return createIngestionSpec(
+          objectMapper,
+          baseDir,
+          DEFAULT_PARSE_SPEC,
+          transformSpec,
+          granularitySpec,
+          tuningConfig,
+          appendToExisting
+      );
+    }
+  }
+
   static IndexIngestionSpec createIngestionSpec(
       ObjectMapper objectMapper,
       File baseDir,
       @Nullable ParseSpec parseSpec,
-      GranularitySpec granularitySpec,
+      @Nullable TransformSpec transformSpec,
+      @Nullable GranularitySpec granularitySpec,
       IndexTuningConfig tuningConfig,
       boolean appendToExisting
   )
   {
     return createIngestionSpec(
-        false,
         objectMapper,
         baseDir,
         parseSpec,
+        null,
+        null,
+        null,
+        transformSpec,
         granularitySpec,
         tuningConfig,
         appendToExisting
@@ -1783,21 +1938,25 @@
   }
 
   static IndexIngestionSpec createIngestionSpec(
-      boolean useInputFormatApi,
       ObjectMapper objectMapper,
       File baseDir,
-      @Nullable ParseSpec parseSpec,
-      GranularitySpec granularitySpec,
+      TimestampSpec timestampSpec,
+      DimensionsSpec dimensionsSpec,
+      InputFormat inputFormat,
+      @Nullable TransformSpec transformSpec,
+      @Nullable GranularitySpec granularitySpec,
       IndexTuningConfig tuningConfig,
       boolean appendToExisting
   )
   {
     return createIngestionSpec(
-        useInputFormatApi,
         objectMapper,
         baseDir,
-        parseSpec,
-        TransformSpec.NONE,
+        null,
+        timestampSpec,
+        dimensionsSpec,
+        inputFormat,
+        transformSpec,
         granularitySpec,
         tuningConfig,
         appendToExisting
@@ -1805,22 +1964,25 @@
   }
 
   private static IndexIngestionSpec createIngestionSpec(
-      boolean useInputFormatApi,
       ObjectMapper objectMapper,
       File baseDir,
       @Nullable ParseSpec parseSpec,
-      TransformSpec transformSpec,
-      GranularitySpec granularitySpec,
+      @Nullable TimestampSpec timestampSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      @Nullable InputFormat inputFormat,
+      @Nullable TransformSpec transformSpec,
+      @Nullable GranularitySpec granularitySpec,
       IndexTuningConfig tuningConfig,
       boolean appendToExisting
   )
   {
-    if (useInputFormatApi) {
+    if (inputFormat != null) {
+      Preconditions.checkArgument(parseSpec == null, "Can't use parseSpec");
       return new IndexIngestionSpec(
           new DataSchema(
               "test",
-              parseSpec == null ? DEFAULT_TIMESTAMP_SPEC : parseSpec.getTimestampSpec(),
-              parseSpec == null ? DEFAULT_DIMENSIONS_SPEC : parseSpec.getDimensionsSpec(),
+              Preconditions.checkNotNull(timestampSpec, "timestampSpec"),
+              Preconditions.checkNotNull(dimensionsSpec, "dimensionsSpec"),
               new AggregatorFactory[]{
                   new LongSumAggregatorFactory("val", "val")
               },
@@ -1834,7 +1996,7 @@
           new IndexIOConfig(
               null,
               new LocalInputSource(baseDir, "druid*"),
-              parseSpec == null ? DEFAULT_INPUT_FORMAT : parseSpec.toInputFormat(),
+              inputFormat,
               appendToExisting
           ),
           tuningConfig
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 194a556..7970e1d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -19,10 +19,14 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
@@ -55,6 +59,7 @@
 import org.joda.time.Interval;
 import org.junit.Assert;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
@@ -82,8 +87,16 @@
     this.useInputFormatApi = useInputFormatApi;
   }
 
+  boolean isUseInputFormatApi()
+  {
+    return useInputFormatApi;
+  }
+
   Set<DataSegment> runTestTask(
-      ParseSpec parseSpec,
+      @Nullable TimestampSpec timestampSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      @Nullable InputFormat inputFormat,
+      @Nullable ParseSpec parseSpec,
       Interval interval,
       File inputDir,
       String filter,
@@ -93,6 +106,9 @@
   )
   {
     final ParallelIndexSupervisorTask task = newTask(
+        timestampSpec,
+        dimensionsSpec,
+        inputFormat,
         parseSpec,
         interval,
         inputDir,
@@ -108,7 +124,10 @@
   }
 
   private ParallelIndexSupervisorTask newTask(
-      ParseSpec parseSpec,
+      @Nullable TimestampSpec timestampSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      @Nullable InputFormat inputFormat,
+      @Nullable ParseSpec parseSpec,
       Interval interval,
       File inputDir,
       String filter,
@@ -154,17 +173,18 @@
     final ParallelIndexIngestionSpec ingestionSpec;
 
     if (useInputFormatApi) {
+      Preconditions.checkArgument(parseSpec == null);
       ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
           null,
           new LocalInputSource(inputDir, filter),
-          parseSpec.toInputFormat(),
+          inputFormat,
           false
       );
       ingestionSpec = new ParallelIndexIngestionSpec(
           new DataSchema(
               "dataSource",
-              parseSpec.getTimestampSpec(),
-              parseSpec.getDimensionsSpec(),
+              timestampSpec,
+              dimensionsSpec,
               new AggregatorFactory[]{
                   new LongSumAggregatorFactory("val", "val")
               },
@@ -175,6 +195,7 @@
           tuningConfig
       );
     } else {
+      Preconditions.checkArgument(inputFormat == null);
       ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
           new LocalFirehoseFactory(inputDir, filter, null),
           false
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 4542ac0..ca33815 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -38,6 +38,7 @@
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -131,7 +132,13 @@
       false,
       0
   );
-  static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat();
+  static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList("ts", "dim", "val"),
+      null,
+      false,
+      false,
+      0
+  );
   static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig(
       null,
       null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index c1165dd..bcd3cbe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -20,7 +20,9 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -54,18 +56,25 @@
 @RunWith(Parameterized.class)
 public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
 {
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+  );
   private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
-      new TimestampSpec(
-          "ts",
-          "auto",
-          null
-      ),
-      new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))),
+      TIMESTAMP_SPEC,
+      DIMENSIONS_SPEC,
       null,
       Arrays.asList("ts", "dim1", "dim2", "val"),
       false,
       0
   );
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList("ts", "dim1", "dim2", "val"),
+      null,
+      false,
+      false,
+      0
+  );
   private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
   private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
 
@@ -112,15 +121,34 @@
   @Test
   public void testRun() throws Exception
   {
-    final Set<DataSegment> publishedSegments = runTestTask(
-        PARSE_SPEC,
-        INTERVAL_TO_INDEX,
-        inputDir,
-        "test_*",
-        new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
-        MAX_NUM_CONCURRENT_SUB_TASKS,
-        TaskState.SUCCESS
-    );
+    final Set<DataSegment> publishedSegments;
+    if (isUseInputFormatApi()) {
+      publishedSegments = runTestTask(
+          TIMESTAMP_SPEC,
+          DIMENSIONS_SPEC,
+          INPUT_FORMAT,
+          null,
+          INTERVAL_TO_INDEX,
+          inputDir,
+          "test_*",
+          new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+          MAX_NUM_CONCURRENT_SUB_TASKS,
+          TaskState.SUCCESS
+      );
+    } else {
+      publishedSegments = runTestTask(
+          null,
+          null,
+          null,
+          PARSE_SPEC,
+          INTERVAL_TO_INDEX,
+          inputDir,
+          "test_*",
+          new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+          MAX_NUM_CONCURRENT_SUB_TASKS,
+          TaskState.SUCCESS
+      );
+    }
     assertHashedPartition(publishedSegments);
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index da2f7a9..f62799b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -25,7 +25,9 @@
 import com.google.common.collect.Multimap;
 import com.google.common.collect.SetMultimap;
 import org.apache.druid.common.config.NullValueHandlingConfig;
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -79,18 +81,25 @@
   private static final String LIST_DELIMITER = "|";
   private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
   private static final String TEST_FILE_NAME_PREFIX = "test_";
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
+  );
   private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
-      new TimestampSpec(
-          TIME,
-          "auto",
-          null
-      ),
-      new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))),
+      TIMESTAMP_SPEC,
+      DIMENSIONS_SPEC,
       LIST_DELIMITER,
       Arrays.asList(TIME, DIM1, DIM2, "val"),
       false,
       0
   );
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList(TIME, DIM1, DIM2, "val"),
+      LIST_DELIMITER,
+      false,
+      false,
+      0
+  );
 
   @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
   public static Iterable<Object[]> constructorFeeder()
@@ -192,20 +201,44 @@
   public void createsCorrectRangePartitions() throws Exception
   {
     int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
-    final Set<DataSegment> publishedSegments = runTestTask(
-        PARSE_SPEC,
-        INTERVAL_TO_INDEX,
-        inputDir,
-        TEST_FILE_NAME_PREFIX + "*",
-        new SingleDimensionPartitionsSpec(
-            targetRowsPerSegment,
-            null,
-            DIM1,
-            false
-        ),
-        maxNumConcurrentSubTasks,
-        useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
-    );
+    final Set<DataSegment> publishedSegments;
+    if (isUseInputFormatApi()) {
+      publishedSegments = runTestTask(
+          TIMESTAMP_SPEC,
+          DIMENSIONS_SPEC,
+          INPUT_FORMAT,
+          null,
+          INTERVAL_TO_INDEX,
+          inputDir,
+          TEST_FILE_NAME_PREFIX + "*",
+          new SingleDimensionPartitionsSpec(
+              targetRowsPerSegment,
+              null,
+              DIM1,
+              false
+          ),
+          maxNumConcurrentSubTasks,
+          useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
+      );
+    } else {
+      publishedSegments = runTestTask(
+          null,
+          null,
+          null,
+          PARSE_SPEC,
+          INTERVAL_TO_INDEX,
+          inputDir,
+          TEST_FILE_NAME_PREFIX + "*",
+          new SingleDimensionPartitionsSpec(
+              targetRowsPerSegment,
+              null,
+              DIM1,
+              false
+          ),
+          maxNumConcurrentSubTasks,
+          useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
+      );
+    }
 
     if (!useMultivalueDim) {
       assertRangePartitions(publishedSegments);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index d147691..03a1193 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -19,8 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -87,7 +85,6 @@
 import org.joda.time.Interval;
 import org.junit.Assert;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -149,12 +146,12 @@
 
   static {
     OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
-    OBJECT_MAPPER.registerSubtypes(new NamedType(UnimplementedInputFormatJsonParseSpec.class, "json"));
+    OBJECT_MAPPER.registerSubtypes(new NamedType(JSONParseSpec.class, "json"));
     OLD_DATA_SCHEMA = new DataSchema(
         "test_ds",
         OBJECT_MAPPER.convertValue(
             new StringInputRowParser(
-                new UnimplementedInputFormatJsonParseSpec(
+                new JSONParseSpec(
                     new TimestampSpec("timestamp", "iso", null),
                     new DimensionsSpec(
                         Arrays.asList(
@@ -443,25 +440,4 @@
       return segmentDescriptor;
     }
   }
-
-  private static class UnimplementedInputFormatJsonParseSpec extends JSONParseSpec
-  {
-    @JsonCreator
-    private UnimplementedInputFormatJsonParseSpec(
-        @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
-        @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
-        @JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
-        @JsonProperty("featureSpec") Map<String, Boolean> featureSpec
-    )
-    {
-      super(timestampSpec, dimensionsSpec, flattenSpec, featureSpec);
-    }
-
-    @Nullable
-    @Override
-    public InputFormat toInputFormat()
-    {
-      return null;
-    }
-  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 76a02db..c1814d6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -22,7 +22,6 @@
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputEntityReader;
-import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -63,7 +62,12 @@
   public void testWithParserAndNullInputformatParseProperly() throws IOException
   {
     final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
-        new NotConvertibleToInputFormatParseSpec(),
+        new JSONParseSpec(
+            TIMESTAMP_SPEC,
+            DimensionsSpec.EMPTY,
+            JSONPathSpec.DEFAULT,
+            Collections.emptyMap()
+        ),
         StringUtils.UTF8_STRING
     );
     final StreamChunkParser chunkParser = new StreamChunkParser(
@@ -109,7 +113,12 @@
   public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException
   {
     final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
-        new NotConvertibleToInputFormatParseSpec(),
+        new JSONParseSpec(
+            TIMESTAMP_SPEC,
+            DimensionsSpec.EMPTY,
+            JSONPathSpec.DEFAULT,
+            Collections.emptyMap()
+        ),
         StringUtils.UTF8_STRING
     );
     final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat(
@@ -138,25 +147,6 @@
     Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
   }
 
-  private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec
-  {
-    private NotConvertibleToInputFormatParseSpec()
-    {
-      super(
-          TIMESTAMP_SPEC,
-          DimensionsSpec.EMPTY,
-          JSONPathSpec.DEFAULT,
-          Collections.emptyMap()
-      );
-    }
-
-    @Override
-    public InputFormat toInputFormat()
-    {
-      return null;
-    }
-  }
-
   private static class TrackingJsonInputFormat extends JsonInputFormat
   {
     private boolean used;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d34bbbb..73bbaca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1027,9 +1027,7 @@
           true,
           minimumMessageTime,
           maximumMessageTime,
-          ioConfig.getInputFormat(
-              getDataSchema().getParser() == null ? null : getDataSchema().getParser().getParseSpec()
-          )
+          ioConfig.getInputFormat()
       )
       {
       };