Support combining inputsource for parallel ingestion (#10387)

* Add combining inputsource

* Fix documentation

Co-authored-by: Atul Mohan <atulmohan@yahoo-inc.com>
diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java
index b0144c5..ba0224e 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.CombiningInputSource;
 import org.apache.druid.data.input.impl.HttpInputSource;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.LocalInputSource;
@@ -50,7 +51,8 @@
 @JsonSubTypes(value = {
     @Type(name = "local", value = LocalInputSource.class),
     @Type(name = "http", value = HttpInputSource.class),
-    @Type(name = "inline", value = InlineInputSource.class)
+    @Type(name = "inline", value = InlineInputSource.class),
+    @Type(name = "combining", value = CombiningInputSource.class)
 })
 public interface InputSource
 {
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java
new file mode 100644
index 0000000..0b8201e
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.Pair;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+/**
+ * InputSource that combines data from multiple inputSources. The delegate inputSources must be splittable.
+ * The splits for this inputSource are created from the {@link SplittableInputSource#createSplits} of the delegate inputSources.
+ * Each inputSplit is paired up with its respective delegate inputSource so that during split,
+ * {@link SplittableInputSource#withSplit}is called against the correct inputSource for each inputSplit.
+ * This inputSource presently only supports a single {@link InputFormat}.
+ */
+
+public class CombiningInputSource extends AbstractInputSource implements SplittableInputSource
+{
+  private final List<SplittableInputSource> delegates;
+
+  @JsonCreator
+  public CombiningInputSource(
+      @JsonProperty("delegates") List<SplittableInputSource> delegates
+  )
+  {
+    Preconditions.checkArgument(
+        delegates != null && !delegates.isEmpty(),
+        "Must specify atleast one delegate inputSource"
+    );
+    this.delegates = delegates;
+  }
+
+  @JsonProperty
+  public List<SplittableInputSource> getDelegates()
+  {
+    return delegates;
+  }
+
+  @Override
+  public Stream<InputSplit> createSplits(
+      InputFormat inputFormat,
+      @Nullable SplitHintSpec splitHintSpec
+  )
+  {
+    return delegates.stream().flatMap(inputSource -> {
+      try {
+        return inputSource.createSplits(inputFormat, splitHintSpec)
+                          .map(inputsplit -> new InputSplit(Pair.of(inputSource, inputsplit)));
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Override
+  public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+  {
+    return delegates.stream().mapToInt(inputSource -> {
+      try {
+        return inputSource.estimateNumSplits(inputFormat, splitHintSpec);
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).sum();
+  }
+
+  @Override
+  public InputSource withSplit(InputSplit split)
+  {
+    Pair<SplittableInputSource, InputSplit> inputSourceWithSplit = (Pair) split.get();
+    return inputSourceWithSplit.lhs.withSplit(inputSourceWithSplit.rhs);
+  }
+
+  @Override
+  public boolean needsFormat()
+  {
+    // This is called only when ParallelIndexIngestionSpec needs to decide if either inputformat vs parserspec is required.
+    // So if at least one of the delegate inputSources needsFormat, we set this to true.
+    // All other needsFormat calls will be made against the delegate inputSources.
+    return delegates.stream().anyMatch(SplittableInputSource::needsFormat);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CombiningInputSource that = (CombiningInputSource) o;
+    return delegates.equals(that.delegates);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(delegates);
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java
new file mode 100644
index 0000000..1db194b
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.Pair;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CombiningInputSourceTest
+{
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ObjectMapper mapper = new ObjectMapper();
+    mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class));
+    final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile()));
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(URI.create("http://test.com/http-test")));
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        fileSource,
+        uriInputSource
+    ));
+    final byte[] json = mapper.writeValueAsBytes(combiningInputSource);
+    final CombiningInputSource fromJson = (CombiningInputSource) mapper.readValue(json, InputSource.class);
+    Assert.assertEquals(combiningInputSource, fromJson);
+  }
+
+  @Test
+  public void testEstimateNumSplits()
+  {
+    final File file = EasyMock.niceMock(File.class);
+    EasyMock.expect(file.length()).andReturn(5L).anyTimes();
+    EasyMock.replay(file);
+    final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(
+            URI.create("http://test.com/http-test1"),
+            URI.create("http://test.com/http-test2"),
+            URI.create("http://test.com/http-test3")
+        )
+    );
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        fileSource,
+        uriInputSource
+    ));
+    Assert.assertEquals(combiningInputSource.estimateNumSplits(
+        new NoopInputFormat(),
+        new MaxSizeSplitHintSpec(
+            new HumanReadableBytes(5L),
+            null
+        )
+    ), 6);
+  }
+
+
+  @Test
+  public void testCreateSplits()
+  {
+    final File file = EasyMock.niceMock(File.class);
+    EasyMock.expect(file.length()).andReturn(30L).anyTimes();
+    EasyMock.replay(file);
+    final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(
+            URI.create("http://test.com/http-test3"),
+            URI.create("http://test.com/http-test4"),
+            URI.create("http://test.com/http-test5")
+        )
+    );
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        fileSource,
+        uriInputSource
+    ));
+    List<InputSplit> combinedInputSplits = combiningInputSource.createSplits(
+        new NoopInputFormat(),
+        new MaxSizeSplitHintSpec(
+            new HumanReadableBytes(5L),
+            null
+        )
+    ).collect(Collectors.toList());
+    Assert.assertEquals(6, combinedInputSplits.size());
+    for (int i = 0; i < 3; i++) {
+      Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
+      InputSplit<File> fileSplits = splitPair.rhs;
+      Assert.assertTrue(splitPair.lhs instanceof TestFileInputSource);
+      Assert.assertEquals(5, fileSplits.get().length());
+    }
+    for (int i = 3; i < combinedInputSplits.size(); i++) {
+      Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
+      InputSplit<URI> fileSplits = splitPair.rhs;
+      Assert.assertTrue(splitPair.lhs instanceof TestUriInputSource);
+      Assert.assertEquals(URI.create("http://test.com/http-test" + i), fileSplits.get());
+    }
+  }
+
+  @Test
+  public void testWithSplits()
+  {
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(
+            URI.create("http://test.com/http-test1"))
+    );
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        uriInputSource
+    ));
+    InputSplit<URI> testUriSplit = new InputSplit<>(URI.create("http://test.com/http-test1"));
+    TestUriInputSource urlInputSourceWithSplit = (TestUriInputSource) combiningInputSource.withSplit(new InputSplit(Pair.of(
+        uriInputSource,
+        testUriSplit)));
+    Assert.assertEquals(uriInputSource, urlInputSourceWithSplit);
+
+  }
+
+  @Test
+  public void testNeedsFormat()
+  {
+    final TestUriInputSource uriInputSource = new TestUriInputSource(
+        ImmutableList.of(
+            URI.create("http://test.com/http-test1")
+        )
+    );
+    final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
+
+    final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
+        uriInputSource,
+        fileSource
+    ));
+    Assert.assertTrue(combiningInputSource.needsFormat());
+
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(CombiningInputSource.class)
+                  .withNonnullFields("delegates")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  private static List<File> generateFiles(int numFiles)
+  {
+    final List<File> files = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      final File file = EasyMock.niceMock(File.class);
+      EasyMock.expect(file.length()).andReturn(5L).anyTimes();
+      EasyMock.replay(file);
+      files.add(file);
+    }
+    return files;
+  }
+
+  private static class TestFileInputSource extends AbstractInputSource implements SplittableInputSource<File>
+  {
+    private final List<File> files;
+
+    @JsonCreator
+    private TestFileInputSource(@JsonProperty("files") List<File> fileList)
+    {
+      files = fileList;
+    }
+
+    @JsonProperty
+    public List<File> getFiles()
+    {
+      return files;
+    }
+
+    @Override
+    public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+    {
+      return files.stream().map(InputSplit::new);
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+    {
+      return files.size();
+    }
+
+    @Override
+    public SplittableInputSource<File> withSplit(InputSplit<File> split)
+    {
+      return new TestFileInputSource(ImmutableList.of(split.get()));
+    }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return true;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestFileInputSource that = (TestFileInputSource) o;
+      return Objects.equals(files, that.files);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(files);
+    }
+  }
+
+  private static class TestUriInputSource extends AbstractInputSource implements SplittableInputSource<URI>
+  {
+    private final List<URI> uris;
+
+    @JsonCreator
+    private TestUriInputSource(@JsonProperty("uris") List<URI> uriList)
+    {
+      uris = uriList;
+    }
+
+    @JsonProperty
+    public List<URI> getUris()
+    {
+      return uris;
+    }
+
+    @Override
+    public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+    {
+      return uris.stream().map(InputSplit::new);
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+    {
+      return uris.size();
+    }
+
+    @Override
+    public SplittableInputSource<URI> withSplit(InputSplit<URI> split)
+    {
+      return new TestUriInputSource(ImmutableList.of(split.get()));
+    }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestUriInputSource that = (TestUriInputSource) o;
+      return Objects.equals(uris, that.uris);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(uris);
+    }
+  }
+}
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 7181efc..9b71825 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1368,6 +1368,48 @@
 * Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`.
 
 
+### Combining Input Source
+
+The Combining input source is used to read data from multiple InputSources. This input source should be only used if all the delegate input sources are
+ _splittable_ and can be used by the [Parallel task](#parallel-task). This input source will identify the splits from its delegates and each split will be processed by a worker task. Similar to other input sources, this input source supports a single `inputFormat`. Therefore, please note that delegate input sources requiring an `inputFormat` must have the same format for input data.
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|This should be "combining".|Yes|
+|delegates|List of _splittable_ InputSources to read data from.|Yes|
+
+Sample spec:
+
+
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "combining",
+        "delegates" : [
+         {
+          "type": "local",
+          "filter" : "*.csv",
+          "baseDir": "/data/directory",
+          "files": ["/bar/foo", "/foo/bar"]
+         },
+         {
+          "type": "druid",
+          "dataSource": "wikipedia",
+          "interval": "2013-01-01/2013-01-02"
+         }
+        ]
+      },
+      "inputFormat": {
+        "type": "csv"
+      },
+      ...
+    },
+...
+```
+
+
 ###
 
 ## Firehoses (Deprecated)
diff --git a/examples/quickstart/tutorial/updates-append-index.json b/examples/quickstart/tutorial/updates-append-index.json
index 75d2335..9ba53a0 100644
--- a/examples/quickstart/tutorial/updates-append-index.json
+++ b/examples/quickstart/tutorial/updates-append-index.json
@@ -1,58 +1,61 @@
 {
-  "type" : "index",
-  "spec" : {
-    "dataSchema" : {
-      "dataSource" : "updates-tutorial",
-      "parser" : {
-        "type" : "string",
-        "parseSpec" : {
-          "format" : "json",
-          "dimensionsSpec" : {
-            "dimensions" : [
-              "animal"
-            ]
-          },
-          "timestampSpec": {
-            "column": "timestamp",
-            "format": "iso"
-          }
-        }
+  "type": "index_parallel",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "updates-tutorial",
+      "dimensionsSpec": {
+        "dimensions": [
+          "animal"
+        ]
       },
-      "metricsSpec" : [
-        { "type" : "count", "name" : "count" },
-        { "type" : "longSum", "name" : "number", "fieldName" : "number" }
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "iso"
+      },
+      "metricsSpec": [
+        { "type": "count", "name": "count"},
+        { "type": "longSum", "name": "number", "fieldName": "number"}
       ],
-      "granularitySpec" : {
-        "type" : "uniform",
-        "segmentGranularity" : "week",
-        "queryGranularity" : "minute",
-        "intervals" : ["2018-01-01/2018-01-03"],
-        "rollup" : true
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "week",
+        "queryGranularity": "minute",
+        "intervals": ["2018-01-01/2018-01-03"],
+        "rollup": true
       }
     },
-    "ioConfig" : {
-      "type" : "index",
-      "firehose" : {
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
         "type": "combining",
         "delegates": [
           {
-            "type"    : "ingestSegment",
-            "dataSource"   : "updates-tutorial",
-            "interval" : "2018-01-01/2018-01-03"
+            "type": "druid",
+            "dataSource": "updates-tutorial",
+            "interval": "2018-01-01/2018-01-03"
           },
           {
-            "type" : "local",
-            "baseDir" : "quickstart/tutorial",
-            "filter" : "updates-data3.json"
+            "type": "local",
+            "baseDir": "quickstart/tutorial",
+            "filter": "updates-data3.json"
           }
         ]
       },
-      "appendToExisting" : false
+      "inputFormat": {
+        "type": "json"
+      },
+      "appendToExisting": false
     },
-    "tuningConfig" : {
-      "type" : "index",
-      "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000
+    "tuningConfig": {
+      "type": "index_parallel",
+      "maxRowsPerSegment": 5000000,
+      "maxRowsInMemory": 25000,
+      "maxNumConcurrentSubTasks": 2,
+      "forceGuaranteedRollup": true,
+      "partitionsSpec": {
+        "type": "hashed",
+        "numShards": 1
+      }
     }
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
new file mode 100644
index 0000000..dd46d9b
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.function.Function;
+
+@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITCombiningInputSourceParallelIndexTest extends AbstractITBatchIndexTest
+{
+  private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+  private static final String INDEX_DATASOURCE = "wikipedia_index_test";
+
+  private static final String COMBINING_INDEX_TASK = "/indexer/wikipedia_combining_input_source_index_parallel_task.json";
+  private static final String COMBINING_QUERIES_RESOURCE = "/indexer/wikipedia_combining_firehose_index_queries.json";
+  private static final String COMBINING_INDEX_DATASOURCE = "wikipedia_comb_index_test";
+
+  @Test
+  public void testIndexData() throws Exception
+  {
+    Map inputFormatMap = new ImmutableMap.Builder<String, Object>().put("type", "json")
+                                                                   .build();
+    try (
+        final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+        final Closeable ignored2 = unloader(COMBINING_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> combiningInputSourceSpecTransform = spec -> {
+        try {
+          spec = StringUtils.replace(
+              spec,
+              "%%PARTITIONS_SPEC%%",
+              jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null))
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_FILTER%%",
+              "wikipedia_index_data*"
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_BASE_DIR%%",
+              "/resources/data/batch_index/json"
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT%%",
+              jsonMapper.writeValueAsString(inputFormatMap)
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%APPEND_TO_EXISTING%%",
+              jsonMapper.writeValueAsString(false)
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%FORCE_GUARANTEED_ROLLUP%%",
+              jsonMapper.writeValueAsString(false)
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%COMBINING_DATASOURCE%%",
+              INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()
+          );
+          return spec;
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          INDEX_DATASOURCE,
+          INDEX_TASK,
+          combiningInputSourceSpecTransform,
+          INDEX_QUERIES_RESOURCE,
+          false,
+          true,
+          true
+      );
+      doIndexTest(
+          COMBINING_INDEX_DATASOURCE,
+          COMBINING_INDEX_TASK,
+          combiningInputSourceSpecTransform,
+          COMBINING_QUERIES_RESOURCE,
+          false,
+          true,
+          true
+      );
+    }
+  }
+
+}
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json b/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json
new file mode 100644
index 0000000..8e1d094
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_combining_input_source_index_parallel_task.json
@@ -0,0 +1,98 @@
+{
+  "type": "index_parallel",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "timestampSpec": {
+        "column": "timestamp"
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "added",
+          "fieldName": "added"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        },
+        {
+          "name": "thetaSketch",
+          "type": "thetaSketch",
+          "fieldName": "user"
+        },
+        {
+          "name": "quantilesDoublesSketch",
+          "type": "quantilesDoublesSketch",
+          "fieldName": "delta"
+        },
+        {
+          "name": "HLLSketchBuild",
+          "type": "HLLSketchBuild",
+          "fieldName": "user"
+        }
+      ],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "second",
+        "intervals" : [ "2013-08-31/2013-09-02" ]
+      },
+      "dimensionsSpec": {
+        "dimensions": [
+          "page",
+          {"type": "string", "name": "language", "createBitmapIndex": false},
+          "user",
+          "unpatrolled",
+          "newPage",
+          "robot",
+          "anonymous",
+          "namespace",
+          "continent",
+          "country",
+          "region",
+          "city"
+        ]
+      }
+    },
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "combining",
+        "delegates": [
+          {
+            "type": "local",
+            "baseDir": "/resources/indexer",
+            "filter": "wikipedia_combining_index_data.json"
+          },
+          {
+            "type": "druid",
+            "dataSource": "%%COMBINING_DATASOURCE%%",
+            "interval": "2013-08-31/2013-09-02"
+          }
+        ]
+      },
+      "appendToExisting": %%APPEND_TO_EXISTING%%,
+      "inputFormat": %%INPUT_FORMAT%%
+    },
+    "tuningConfig": {
+      "type": "index_parallel",
+      "maxNumConcurrentSubTasks": 4,
+      "splitHintSpec": {
+        "type": "maxSize",
+        "maxNumFiles": 1
+      },
+      "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
+      "partitionsSpec": %%PARTITIONS_SPEC%%
+    }
+  }
+}