Make intermediate store for shuffle tasks an extension point (#11492)

* add interface

* add docs

* fix errors

* fix injection

* fix injection

* update javadoc
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9b840f0..b1869ac 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1298,6 +1298,7 @@
 |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
+|`druid.processing.intermediaryData.storage.type`|Storage type for storing intermediary segments of data shuffle between native parallel index tasks. Current choice are only "local" which stores segment files in local storage of Middle Managers (or Indexer).|local|
 
 The amount of direct memory needed by Druid is at least
 `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
index b6ea7aa..2b33f0f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import org.apache.druid.guice.annotations.ExtensionPoint;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 
 import java.io.File;
@@ -26,12 +27,13 @@
 
 /**
  * An interface for intermediate data shuffle during the parallel indexing.
- * The only available implementation for production code is {@link HttpShuffleClient} and
- * this interface is more for easier testing.
+ *
+ * Extension can implement this interface to fetch intermediary data at custom location such as various cloud storages.
  *
  * @see IntermediaryDataManager
  * @see PartialSegmentMergeTask
  */
+@ExtensionPoint
 public interface ShuffleClient
 {
   /**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
index 584ef51..a903bfc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
@@ -19,413 +19,59 @@
 
 package org.apache.druid.indexing.worker.shuffle;
 
-import com.google.common.collect.Iterators;
-import com.google.common.io.Files;
-import com.google.inject.Inject;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.client.indexing.TaskStatus;
-import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.loading.StorageLocation;
+import com.google.common.io.ByteSource;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
-import org.apache.druid.utils.CompressionUtils;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
-import org.joda.time.DateTime;
 import org.joda.time.Interval;
-import org.joda.time.Period;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import java.util.Optional;
 
 /**
- * This class manages intermediary segments for data shuffle between native parallel index tasks.
- * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
- * and phase 2 tasks read those files over HTTP.
+ * This interface manages intermediary segments for data shuffle between native parallel index tasks.
+ * In native parallel indexing, phase 1 tasks store segment files using the implementation of this interface
+ * and phase 2 tasks read those files using {@link ShuffleClient}.
  *
- * The directory where segment files are placed is structured as
- * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
+ * This interface provides methods to store, find, and remove segment files.
+ * Note that the implementation should also implement a self-cleanup mechanism to clean up stale segment files for
+ * supervisorTask that is not running anymore.
  *
- * This class provides interfaces to store, find, and remove segment files.
- * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
- * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
+ * Extension can implement this interface to store intermediary data at custom location such as various cloud storages.
  */
-@ManageLifecycle
-public class IntermediaryDataManager
+@ExtensionPoint
+public interface IntermediaryDataManager
 {
-  private static final Logger LOG = new Logger(IntermediaryDataManager.class);
-
-  private final long intermediaryPartitionDiscoveryPeriodSec;
-  private final long intermediaryPartitionCleanupPeriodSec;
-  private final Period intermediaryPartitionTimeout;
-  private final TaskConfig taskConfig;
-  private final List<StorageLocation> shuffleDataLocations;
-  private final IndexingServiceClient indexingServiceClient;
-
-  // supervisorTaskId -> time to check supervisorTask status
-  // This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
-  // the supervisor.
-  private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
-
-  // supervisorTaskId -> cyclic iterator of storage locations
-  private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
-
-  // The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
-  // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
-  // partitions.
-  // This can be null until IntermediaryDataManager is started.
-  @MonotonicNonNull
-  private ScheduledExecutorService supervisorTaskChecker;
-
-  @Inject
-  public IntermediaryDataManager(
-      WorkerConfig workerConfig,
-      TaskConfig taskConfig,
-      IndexingServiceClient indexingServiceClient
-  )
-  {
-    this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
-    this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
-    this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
-    this.taskConfig = taskConfig;
-    this.shuffleDataLocations = taskConfig
-        .getShuffleDataLocations()
-        .stream()
-        .map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
-        .collect(Collectors.toList());
-    this.indexingServiceClient = indexingServiceClient;
-  }
-
-  @LifecycleStart
-  public void start()
-  {
-    discoverSupervisorTaskPartitions();
-    supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
-    // Discover partitions for new supervisorTasks
-    supervisorTaskChecker.scheduleAtFixedRate(
-        () -> {
-          try {
-            discoverSupervisorTaskPartitions();
-          }
-          catch (Exception e) {
-            LOG.warn(e, "Error while discovering supervisorTasks");
-          }
-        },
-        intermediaryPartitionDiscoveryPeriodSec,
-        intermediaryPartitionDiscoveryPeriodSec,
-        TimeUnit.SECONDS
-    );
-
-    supervisorTaskChecker.scheduleAtFixedRate(
-        () -> {
-          try {
-            deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
-          }
-          catch (InterruptedException e) {
-            LOG.error(e, "Error while cleaning up partitions for expired supervisors");
-          }
-          catch (Exception e) {
-            LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
-          }
-        },
-        intermediaryPartitionCleanupPeriodSec,
-        intermediaryPartitionCleanupPeriodSec,
-        TimeUnit.SECONDS
-    );
-  }
-
-  @LifecycleStop
-  public void stop() throws InterruptedException
-  {
-    if (supervisorTaskChecker != null) {
-      supervisorTaskChecker.shutdownNow();
-      supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
-    }
-    supervisorTaskCheckTimes.clear();
-  }
-
   /**
-   * IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
-   */
-  private void discoverSupervisorTaskPartitions()
-  {
-    for (StorageLocation location : shuffleDataLocations) {
-      final Path locationPath = location.getPath().toPath().toAbsolutePath();
-      final MutableInt numDiscovered = new MutableInt(0);
-      final File[] dirsPerSupervisorTask = location.getPath().listFiles();
-      if (dirsPerSupervisorTask != null) {
-        for (File supervisorTaskDir : dirsPerSupervisorTask) {
-          final String supervisorTaskId = supervisorTaskDir.getName();
-          supervisorTaskCheckTimes.computeIfAbsent(
-              supervisorTaskId,
-              k -> {
-                for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
-                  final String relativeSegmentPath = locationPath
-                      .relativize(eachFile.toPath().toAbsolutePath())
-                      .toString();
-                  // StorageLocation keeps track of how much storage capacity is being used.
-                  // Newly found files should be known to the StorageLocation to keep it up to date.
-                  final File reservedFile = location.reserve(
-                      relativeSegmentPath,
-                      eachFile.getName(),
-                      eachFile.length()
-                  );
-                  if (reservedFile == null) {
-                    LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
-                  }
-                }
-                numDiscovered.increment();
-                return getExpiryTimeFromNow();
-              }
-          );
-        }
-      }
-
-      if (numDiscovered.getValue() > 0) {
-        LOG.info(
-            "Discovered partitions for [%s] new supervisor tasks under location[%s]",
-            numDiscovered.getValue(),
-            location.getPath()
-        );
-      }
-    }
-  }
-
-  /**
-   * Check supervisorTask status if its partitions have not been accessed in timeout and
-   * delete all partitions for the supervisorTask if it is already finished.
+   * Write a segment into one of configured locations
    *
-   * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
-   * the self-cleanup for when the cleanup request is missing.
+   * @param supervisorTaskId - Id of the supervisor task writing the segment
+   * @param subTaskId - Id of the sub task writing the segment
+   * @param segment - Segment to write
+   * @param segmentDir - Directory of the segment to write
+   *
+   * @return size of the writen segment
    */
-  private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
-  {
-    final DateTime now = DateTimes.nowUtc();
-    final Set<String> expiredSupervisorTasks = new HashSet<>();
-    for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
-      final String supervisorTaskId = entry.getKey();
-      final DateTime checkTime = entry.getValue();
-      if (checkTime.isAfter(now)) {
-        expiredSupervisorTasks.add(supervisorTaskId);
-      }
-    }
-
-    if (!expiredSupervisorTasks.isEmpty()) {
-      LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
-    }
-
-    if (!expiredSupervisorTasks.isEmpty()) {
-      final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
-      for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
-        final String supervisorTaskId = entry.getKey();
-        final TaskStatus status = entry.getValue();
-        if (status.getStatusCode().isComplete()) {
-          // If it's finished, clean up all partitions for the supervisor task.
-          try {
-            deletePartitions(supervisorTaskId);
-          }
-          catch (IOException e) {
-            LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
-          }
-        } else {
-          // If it's still running, update last access time.
-          supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
-        }
-      }
-    }
-  }
+  long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException;
 
   /**
-   * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
-   * supervisorTaskId.
+   * Find the partition file. Note that the returned ByteSource method size() should be fast.
+   *
+   * @param supervisorTaskId - Supervisor task id of the partition file to find
+   * @param subTaskId - Sub task id of the partition file to find
+   * @param interval - Interval of the partition file to find
+   * @param bucketId - Bucket id of the partition file to find
+   *
+   * @return ByteSource wrapped in {@link Optional} if the file is found, otherwise return {@link Optional#empty()}
    */
-  long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
-      throws IOException
-  {
-    // Get or create the location iterator for supervisorTask.
-    final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
-        supervisorTaskId,
-        k -> {
-          final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
-          // Random start of the iterator
-          final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
-          IntStream.range(0, random).forEach(i -> cyclicIterator.next());
-          return cyclicIterator;
-        }
-    );
+  Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId);
 
-    // Create a zipped segment in a temp directory.
-    final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
-    final Closer closer = Closer.create();
-    closer.register(() -> {
-      try {
-        FileUtils.forceDelete(taskTempDir);
-      }
-      catch (IOException e) {
-        LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
-      }
-    });
-
-    if (!(segment.getShardSpec() instanceof BucketNumberedShardSpec)) {
-      throw new IAE(
-          "Invalid shardSpec type. Expected [%s] but got [%s]",
-          BucketNumberedShardSpec.class.getName(),
-          segment.getShardSpec().getClass().getName()
-      );
-    }
-    final BucketNumberedShardSpec<?> bucketNumberedShardSpec = (BucketNumberedShardSpec<?>) segment.getShardSpec();
-
-    //noinspection unused
-    try (final Closer resourceCloser = closer) {
-      FileUtils.forceMkdir(taskTempDir);
-
-      // Tempary compressed file. Will be removed when taskTempDir is deleted.
-      final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
-      final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
-      if (unzippedSizeBytes == 0) {
-        throw new IOE(
-            "Read 0 bytes from segmentDir[%s]",
-            segmentDir.getAbsolutePath()
-        );
-      }
-
-      // Try copying the zipped segment to one of storage locations
-      for (int i = 0; i < shuffleDataLocations.size(); i++) {
-        final StorageLocation location = iterator.next();
-        final String partitionFilePath = getPartitionFilePath(
-            supervisorTaskId,
-            subTaskId,
-            segment.getInterval(),
-            bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID
-        );
-        final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
-        if (destFile != null) {
-          try {
-            FileUtils.forceMkdirParent(destFile);
-            org.apache.druid.java.util.common.FileUtils.writeAtomically(
-                destFile,
-                out -> Files.asByteSource(tempZippedFile).copyTo(out)
-            );
-            LOG.info(
-                "Wrote intermediary segment[%s] for subtask[%s] at [%s]",
-                segment.getId(),
-                subTaskId,
-                destFile
-            );
-            return unzippedSizeBytes;
-          }
-          catch (Exception e) {
-            location.release(partitionFilePath, tempZippedFile.length());
-            FileUtils.deleteQuietly(destFile);
-            LOG.warn(
-                e,
-                "Failed to write segment[%s] at [%s]. Trying again with the next location",
-                segment.getId(),
-                destFile
-            );
-          }
-        }
-      }
-      throw new ISE("Can't find location to handle segment[%s]", segment);
-    }
-  }
-
-  @Nullable
-  public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
-  {
-    IdUtils.validateId("supervisorTaskId", supervisorTaskId);
-    for (StorageLocation location : shuffleDataLocations) {
-      final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId));
-      if (partitionDir.exists()) {
-        supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
-        final File[] segmentFiles = partitionDir.listFiles();
-        if (segmentFiles == null) {
-          return null;
-        } else {
-          for (File segmentFile : segmentFiles) {
-            if (segmentFile.getName().equals(subTaskId)) {
-              return segmentFile;
-            }
-          }
-          return null;
-        }
-      }
-    }
-
-    return null;
-  }
-
-  private DateTime getExpiryTimeFromNow()
-  {
-    return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
-  }
-
-  public void deletePartitions(String supervisorTaskId) throws IOException
-  {
-    IdUtils.validateId("supervisorTaskId", supervisorTaskId);
-    for (StorageLocation location : shuffleDataLocations) {
-      final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
-      if (supervisorTaskPath.exists()) {
-        LOG.info("Cleaning up [%s]", supervisorTaskPath);
-        for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
-          location.removeFile(eachFile);
-        }
-        FileUtils.forceDelete(supervisorTaskPath);
-      }
-    }
-    supervisorTaskCheckTimes.remove(supervisorTaskId);
-  }
-
-  private static String getPartitionFilePath(
-      String supervisorTaskId,
-      String subTaskId,
-      Interval interval,
-      int bucketId
-  )
-  {
-    return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
-  }
-
-  private static String getPartitionDir(
-      String supervisorTaskId,
-      Interval interval,
-      int bucketId
-  )
-  {
-    return Paths.get(
-        supervisorTaskId,
-        interval.getStart().toString(),
-        interval.getEnd().toString(),
-        String.valueOf(bucketId)
-    ).toString();
-  }
+  /**
+   * Delete the partitions
+   *
+   * @param supervisorTaskId - Supervisor task id of the partitions to delete
+   *
+   */
+  void deletePartitions(String supervisorTaskId) throws IOException;
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
new file mode 100644
index 0000000..7331cf6
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteSource;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * In native parallel indexing, this class store segment files of phase 1 tasks in local storage of middleManagers (or indexer)
+ * and phase 2 tasks read those files over HTTP.
+ *
+ * The directory where segment files are placed is structured as
+ * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
+ *
+ * This class also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
+ * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
+ */
+@ManageLifecycle
+public class LocalIntermediaryDataManager implements IntermediaryDataManager
+{
+  private static final Logger LOG = new Logger(LocalIntermediaryDataManager.class);
+
+  private final long intermediaryPartitionDiscoveryPeriodSec;
+  private final long intermediaryPartitionCleanupPeriodSec;
+  private final Period intermediaryPartitionTimeout;
+  private final TaskConfig taskConfig;
+  private final List<StorageLocation> shuffleDataLocations;
+  private final IndexingServiceClient indexingServiceClient;
+
+  // supervisorTaskId -> time to check supervisorTask status
+  // This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
+  // the supervisor.
+  private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
+
+  // supervisorTaskId -> cyclic iterator of storage locations
+  private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
+
+  // The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
+  // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
+  // partitions.
+  // This can be null until LocalIntermediaryDataManager is started.
+  @MonotonicNonNull
+  private ScheduledExecutorService supervisorTaskChecker;
+
+  @Inject
+  public LocalIntermediaryDataManager(
+      WorkerConfig workerConfig,
+      TaskConfig taskConfig,
+      IndexingServiceClient indexingServiceClient
+  )
+  {
+    this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
+    this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
+    this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
+    this.taskConfig = taskConfig;
+    this.shuffleDataLocations = taskConfig
+        .getShuffleDataLocations()
+        .stream()
+        .map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
+        .collect(Collectors.toList());
+    this.indexingServiceClient = indexingServiceClient;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    discoverSupervisorTaskPartitions();
+    supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
+    // Discover partitions for new supervisorTasks
+    supervisorTaskChecker.scheduleAtFixedRate(
+        () -> {
+          try {
+            discoverSupervisorTaskPartitions();
+          }
+          catch (Exception e) {
+            LOG.warn(e, "Error while discovering supervisorTasks");
+          }
+        },
+        intermediaryPartitionDiscoveryPeriodSec,
+        intermediaryPartitionDiscoveryPeriodSec,
+        TimeUnit.SECONDS
+    );
+
+    supervisorTaskChecker.scheduleAtFixedRate(
+        () -> {
+          try {
+            deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
+          }
+          catch (InterruptedException e) {
+            LOG.error(e, "Error while cleaning up partitions for expired supervisors");
+          }
+          catch (Exception e) {
+            LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
+          }
+        },
+        intermediaryPartitionCleanupPeriodSec,
+        intermediaryPartitionCleanupPeriodSec,
+        TimeUnit.SECONDS
+    );
+  }
+
+  @LifecycleStop
+  public void stop() throws InterruptedException
+  {
+    if (supervisorTaskChecker != null) {
+      supervisorTaskChecker.shutdownNow();
+      supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
+    }
+    supervisorTaskCheckTimes.clear();
+  }
+
+  /**
+   * LocalIntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
+   */
+  private void discoverSupervisorTaskPartitions()
+  {
+    for (StorageLocation location : shuffleDataLocations) {
+      final Path locationPath = location.getPath().toPath().toAbsolutePath();
+      final MutableInt numDiscovered = new MutableInt(0);
+      final File[] dirsPerSupervisorTask = location.getPath().listFiles();
+      if (dirsPerSupervisorTask != null) {
+        for (File supervisorTaskDir : dirsPerSupervisorTask) {
+          final String supervisorTaskId = supervisorTaskDir.getName();
+          supervisorTaskCheckTimes.computeIfAbsent(
+              supervisorTaskId,
+              k -> {
+                for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
+                  final String relativeSegmentPath = locationPath
+                      .relativize(eachFile.toPath().toAbsolutePath())
+                      .toString();
+                  // StorageLocation keeps track of how much storage capacity is being used.
+                  // Newly found files should be known to the StorageLocation to keep it up to date.
+                  final File reservedFile = location.reserve(
+                      relativeSegmentPath,
+                      eachFile.getName(),
+                      eachFile.length()
+                  );
+                  if (reservedFile == null) {
+                    LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
+                  }
+                }
+                numDiscovered.increment();
+                return getExpiryTimeFromNow();
+              }
+          );
+        }
+      }
+
+      if (numDiscovered.getValue() > 0) {
+        LOG.info(
+            "Discovered partitions for [%s] new supervisor tasks under location[%s]",
+            numDiscovered.getValue(),
+            location.getPath()
+        );
+      }
+    }
+  }
+
+  /**
+   * Check supervisorTask status if its partitions have not been accessed in timeout and
+   * delete all partitions for the supervisorTask if it is already finished.
+   *
+   * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
+   * the self-cleanup for when the cleanup request is missing.
+   */
+  private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
+  {
+    final DateTime now = DateTimes.nowUtc();
+    final Set<String> expiredSupervisorTasks = new HashSet<>();
+    for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
+      final String supervisorTaskId = entry.getKey();
+      final DateTime checkTime = entry.getValue();
+      if (checkTime.isAfter(now)) {
+        expiredSupervisorTasks.add(supervisorTaskId);
+      }
+    }
+
+    if (!expiredSupervisorTasks.isEmpty()) {
+      LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
+    }
+
+    if (!expiredSupervisorTasks.isEmpty()) {
+      final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
+      for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
+        final String supervisorTaskId = entry.getKey();
+        final TaskStatus status = entry.getValue();
+        if (status.getStatusCode().isComplete()) {
+          // If it's finished, clean up all partitions for the supervisor task.
+          try {
+            deletePartitions(supervisorTaskId);
+          }
+          catch (IOException e) {
+            LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
+          }
+        } else {
+          // If it's still running, update last access time.
+          supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
+        }
+      }
+    }
+  }
+
+  /**
+   * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
+   * supervisorTaskId.
+   */
+  @Override
+  public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+      throws IOException
+  {
+    // Get or create the location iterator for supervisorTask.
+    final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
+        supervisorTaskId,
+        k -> {
+          final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
+          // Random start of the iterator
+          final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
+          IntStream.range(0, random).forEach(i -> cyclicIterator.next());
+          return cyclicIterator;
+        }
+    );
+
+    // Create a zipped segment in a temp directory.
+    final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
+    final Closer closer = Closer.create();
+    closer.register(() -> {
+      try {
+        FileUtils.forceDelete(taskTempDir);
+      }
+      catch (IOException e) {
+        LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
+      }
+    });
+
+    if (!(segment.getShardSpec() instanceof BucketNumberedShardSpec)) {
+      throw new IAE(
+          "Invalid shardSpec type. Expected [%s] but got [%s]",
+          BucketNumberedShardSpec.class.getName(),
+          segment.getShardSpec().getClass().getName()
+      );
+    }
+    final BucketNumberedShardSpec<?> bucketNumberedShardSpec = (BucketNumberedShardSpec<?>) segment.getShardSpec();
+
+    //noinspection unused
+    try (final Closer resourceCloser = closer) {
+      FileUtils.forceMkdir(taskTempDir);
+
+      // Tempary compressed file. Will be removed when taskTempDir is deleted.
+      final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
+      final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
+      if (unzippedSizeBytes == 0) {
+        throw new IOE(
+            "Read 0 bytes from segmentDir[%s]",
+            segmentDir.getAbsolutePath()
+        );
+      }
+
+      // Try copying the zipped segment to one of storage locations
+      for (int i = 0; i < shuffleDataLocations.size(); i++) {
+        final StorageLocation location = iterator.next();
+        final String partitionFilePath = getPartitionFilePath(
+            supervisorTaskId,
+            subTaskId,
+            segment.getInterval(),
+            bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID
+        );
+        final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
+        if (destFile != null) {
+          try {
+            FileUtils.forceMkdirParent(destFile);
+            org.apache.druid.java.util.common.FileUtils.writeAtomically(
+                destFile,
+                out -> Files.asByteSource(tempZippedFile).copyTo(out)
+            );
+            LOG.info(
+                "Wrote intermediary segment[%s] for subtask[%s] at [%s]",
+                segment.getId(),
+                subTaskId,
+                destFile
+            );
+            return unzippedSizeBytes;
+          }
+          catch (Exception e) {
+            location.release(partitionFilePath, tempZippedFile.length());
+            FileUtils.deleteQuietly(destFile);
+            LOG.warn(
+                e,
+                "Failed to write segment[%s] at [%s]. Trying again with the next location",
+                segment.getId(),
+                destFile
+            );
+          }
+        }
+      }
+      throw new ISE("Can't find location to handle segment[%s]", segment);
+    }
+  }
+
+  @Override
+  public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
+  {
+    IdUtils.validateId("supervisorTaskId", supervisorTaskId);
+    for (StorageLocation location : shuffleDataLocations) {
+      final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId));
+      if (partitionDir.exists()) {
+        supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
+        final File[] segmentFiles = partitionDir.listFiles();
+        if (segmentFiles == null) {
+          return Optional.empty();
+        } else {
+          for (File segmentFile : segmentFiles) {
+            if (segmentFile.getName().equals(subTaskId)) {
+              return Optional.of(Files.asByteSource(segmentFile));
+            }
+          }
+          return Optional.empty();
+        }
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  private DateTime getExpiryTimeFromNow()
+  {
+    return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
+  }
+
+  @Override
+  public void deletePartitions(String supervisorTaskId) throws IOException
+  {
+    IdUtils.validateId("supervisorTaskId", supervisorTaskId);
+    for (StorageLocation location : shuffleDataLocations) {
+      final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
+      if (supervisorTaskPath.exists()) {
+        LOG.info("Cleaning up [%s]", supervisorTaskPath);
+        for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
+          location.removeFile(eachFile);
+        }
+        FileUtils.forceDelete(supervisorTaskPath);
+      }
+    }
+    supervisorTaskCheckTimes.remove(supervisorTaskId);
+  }
+
+  private static String getPartitionFilePath(
+      String supervisorTaskId,
+      String subTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
+  }
+
+  private static String getPartitionDir(
+      String supervisorTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(
+        supervisorTaskId,
+        interval.getStart().toString(),
+        interval.getEnd().toString(),
+        String.valueOf(bucketId)
+    ).toString();
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
index dd885a2..d37fc88 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.indexing.worker.shuffle;
 
-import com.google.common.io.ByteStreams;
+import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.druid.java.util.common.DateTimes;
@@ -38,8 +38,6 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -81,14 +79,14 @@
   )
   {
     final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime));
-    final File partitionFile = intermediaryDataManager.findPartitionFile(
+    final Optional<ByteSource> partitionFile = intermediaryDataManager.findPartitionFile(
         supervisorTaskId,
         subTaskId,
         interval,
         bucketId
     );
 
-    if (partitionFile == null) {
+    if (!partitionFile.isPresent()) {
       final String errorMessage = StringUtils.format(
           "Can't find the partition for supervisorTask[%s], subTask[%s], interval[%s], and bucketId[%s]",
           supervisorTaskId,
@@ -98,14 +96,19 @@
       );
       return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
     } else {
-      shuffleMetrics.ifPresent(metrics -> metrics.shuffleRequested(supervisorTaskId, partitionFile.length()));
-      return Response.ok(
-          (StreamingOutput) output -> {
-            try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
-              ByteStreams.copy(fileInputStream, output);
-            }
-          }
-      ).build();
+      try {
+        long size = partitionFile.get().size();
+        shuffleMetrics.ifPresent(metrics -> metrics.shuffleRequested(supervisorTaskId, size));
+      }
+      catch (IOException ioException) {
+        log.error("Failed to get length of file for supervisorTask[%s], subTask[%s], interval[%s], and bucketId[%s]",
+                  supervisorTaskId,
+                  subTaskId,
+                  interval,
+                  bucketId
+        );
+      }
+      return Response.ok((StreamingOutput) output -> partitionFile.get().copyTo(output)).build();
     }
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index d10013e..f1566bc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -26,7 +26,7 @@
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -66,6 +66,7 @@
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
@@ -229,7 +230,7 @@
     taskRunner = new SimpleThreadingTaskRunner();
     objectMapper = getObjectMapper();
     indexingServiceClient = new LocalIndexingServiceClient(objectMapper, taskRunner);
-    intermediaryDataManager = new IntermediaryDataManager(
+    intermediaryDataManager = new LocalIntermediaryDataManager(
         new WorkerConfig(),
         new TaskConfig(
             null,
@@ -717,19 +718,19 @@
         P location
     ) throws IOException
     {
-      final File zippedFile = intermediaryDataManager.findPartitionFile(
+      final java.util.Optional<ByteSource> zippedFile = intermediaryDataManager.findPartitionFile(
           supervisorTaskId,
           location.getSubTaskId(),
           location.getInterval(),
           location.getBucketId()
       );
-      if (zippedFile == null) {
+      if (!zippedFile.isPresent()) {
         throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
       }
       final File fetchedFile = new File(partitionDir, StringUtils.format("temp_%s", location.getSubTaskId()));
       FileUtils.writeAtomically(
           fetchedFile,
-          out -> Files.asByteSource(zippedFile).copyTo(out)
+          out -> zippedFile.get().copyTo(out)
       );
       return fetchedFile;
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
similarity index 93%
rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
index 9736fc7..632b1ef 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
@@ -51,12 +51,12 @@
 import java.util.Map;
 import java.util.Set;
 
-public class IntermediaryDataManagerAutoCleanupTest
+public class LocalIntermediaryDataManagerAutoCleanupTest
 {
   @Rule
   public TemporaryFolder tempDir = new TemporaryFolder();
 
-  private IntermediaryDataManager intermediaryDataManager;
+  private LocalIntermediaryDataManager intermediaryDataManager;
 
   @Before
   public void setup() throws IOException
@@ -107,7 +107,7 @@
         return result;
       }
     };
-    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
     intermediaryDataManager.start();
   }
 
@@ -128,7 +128,7 @@
     intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, segmentFile);
 
     Thread.sleep(3000);
-    Assert.assertNull(intermediaryDataManager.findPartitionFile(supervisorTaskId, subTaskId, interval, 0));
+    Assert.assertFalse(intermediaryDataManager.findPartitionFile(supervisorTaskId, subTaskId, interval, 0).isPresent());
   }
 
   private File generateSegmentDir(String fileName) throws IOException
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
similarity index 93%
rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
index 6e52b30..5ad391a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSource;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
@@ -46,8 +47,9 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Optional;
 
-public class IntermediaryDataManagerManualAddAndDeleteTest
+public class LocalIntermediaryDataManagerManualAddAndDeleteTest
 {
   @Rule
   public TemporaryFolder tempDir = new TemporaryFolder();
@@ -55,7 +57,7 @@
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  private IntermediaryDataManager intermediaryDataManager;
+  private LocalIntermediaryDataManager intermediaryDataManager;
   private File intermediarySegmentsLocation;
   private File siblingLocation;
 
@@ -79,7 +81,7 @@
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
-    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
     intermediaryDataManager.start();
   }
 
@@ -117,13 +119,13 @@
       intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
     }
     for (int i = 0; i < 4; i++) {
-      final File file = intermediaryDataManager.findPartitionFile(
+      final Optional<ByteSource> file = intermediaryDataManager.findPartitionFile(
           supervisorTaskId,
           "subTaskId_" + i,
           interval,
           partitionId
       );
-      Assert.assertNotNull(file);
+      Assert.assertTrue(file.isPresent());
     }
   }
 
@@ -144,8 +146,8 @@
 
     for (int partitionId = 0; partitionId < 2; partitionId++) {
       for (int subTaskId = 0; subTaskId < 2; subTaskId++) {
-        Assert.assertNull(
-            intermediaryDataManager.findPartitionFile(supervisorTaskId, "subTaskId_" + subTaskId, interval, partitionId)
+        Assert.assertFalse(
+            intermediaryDataManager.findPartitionFile(supervisorTaskId, "subTaskId_" + subTaskId, interval, partitionId).isPresent()
         );
       }
     }
@@ -216,13 +218,13 @@
     Assert.assertTrue(
         new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists());
 
-    final File foundFile1 = intermediaryDataManager.findPartitionFile(
+    final Optional<ByteSource> foundFile1 = intermediaryDataManager.findPartitionFile(
         supervisorTaskId,
         someFile,
         interval,
         partitionId
     );
-    Assert.assertNull(foundFile1);
+    Assert.assertFalse(foundFile1.isPresent());
   }
 
   private File generateSegmentDir(String fileName) throws IOException
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index f110cc2..f5fbb7f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
 import com.google.common.primitives.Ints;
 import org.apache.commons.io.FileUtils;
@@ -48,13 +49,14 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 
 public class ShuffleDataSegmentPusherTest
 {
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-  private IntermediaryDataManager intermediaryDataManager;
+  private LocalIntermediaryDataManager intermediaryDataManager;
   private ShuffleDataSegmentPusher segmentPusher;
 
   @Before
@@ -75,7 +77,7 @@
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
-    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
     intermediaryDataManager.start();
     segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager);
   }
@@ -96,15 +98,20 @@
     Assert.assertEquals(9, pushed.getBinaryVersion().intValue());
     Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version
 
-    final File zippedSegment = intermediaryDataManager.findPartitionFile(
+    final Optional<ByteSource> zippedSegment = intermediaryDataManager.findPartitionFile(
         "supervisorTaskId",
         "subTaskId",
         segment.getInterval(),
         segment.getShardSpec().getPartitionNum()
     );
-    Assert.assertNotNull(zippedSegment);
+    Assert.assertTrue(zippedSegment.isPresent());
     final File tempDir = temporaryFolder.newFolder();
-    final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir);
+    final FileCopyResult result = CompressionUtils.unzip(
+        zippedSegment.get(),
+        tempDir,
+        org.apache.druid.java.util.common.FileUtils.IS_EXCEPTION,
+        false
+    );
     final List<File> unzippedFiles = new ArrayList<>(result.getFiles());
     unzippedFiles.sort(Comparator.comparing(File::getName));
     final File dataFile = unzippedFiles.get(0);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
index 54a6b02..798b05f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -59,7 +59,7 @@
   @Rule
   public TemporaryFolder tempDir = new TemporaryFolder();
 
-  private IntermediaryDataManager intermediaryDataManager;
+  private LocalIntermediaryDataManager intermediaryDataManager;
   private ShuffleMetrics shuffleMetrics;
   private ShuffleResource shuffleResource;
 
@@ -112,7 +112,7 @@
         return result;
       }
     };
-    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
     shuffleMetrics = new ShuffleMetrics();
     shuffleResource = new ShuffleResource(intermediaryDataManager, Optional.of(shuffleMetrics));
   }
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 0bb52a8..c11d1a4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -130,6 +130,7 @@
             JsonConfigProvider.bind(binder, "druid", DruidNode.class, Parent.class);
             JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
 
+            CliPeon.configureIntermediaryData(binder);
             CliPeon.bindTaskConfigAndClients(binder);
 
             binder.bind(TaskReportFileWriter.class).toInstance(new MultipleFileTaskReportFileWriter());
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index c6f7d45..40a0a5a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -61,6 +61,8 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.indexing.worker.http.TaskManagementResource;
 import org.apache.druid.indexing.worker.http.WorkerResource;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.input.InputSourceModule;
@@ -166,6 +168,23 @@
 
             Jerseys.addResource(binder, SelfDiscoveryResource.class);
             LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
+
+            configureIntermediaryData(binder);
+          }
+
+          private void configureIntermediaryData(Binder binder)
+          {
+            PolyBind.createChoice(
+                binder,
+                "druid.processing.intermediaryData.storage.type",
+                Key.get(IntermediaryDataManager.class),
+                Key.get(LocalIntermediaryDataManager.class)
+            );
+            final MapBinder<String, IntermediaryDataManager> biddy = PolyBind.optionBinder(
+                binder,
+                Key.get(IntermediaryDataManager.class)
+            );
+            biddy.addBinding("local").to(LocalIntermediaryDataManager.class);
           }
 
           @Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index b6b4336..8c1fb00 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -88,6 +88,8 @@
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
@@ -209,6 +211,7 @@
 
             bindRowIngestionMeters(binder);
             bindChatHandler(binder);
+            configureIntermediaryData(binder);
             bindTaskConfigAndClients(binder);
             bindPeonDataSegmentHandlers(binder);
 
@@ -423,7 +426,6 @@
 
     configureTaskActionClient(binder);
     binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
-    binder.bind(ShuffleClient.class).to(HttpShuffleClient.class).in(LazySingleton.class);
 
     binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>(){})
           .to(ParallelIndexTaskClientFactory.class)
@@ -452,4 +454,30 @@
     binder.bind(CoordinatorClient.class).in(LazySingleton.class);
   }
 
+  static void configureIntermediaryData(Binder binder)
+  {
+    PolyBind.createChoice(
+        binder,
+        "druid.processing.intermediaryData.storage.type",
+        Key.get(IntermediaryDataManager.class),
+        Key.get(LocalIntermediaryDataManager.class)
+    );
+    final MapBinder<String, IntermediaryDataManager> intermediaryDataManagerBiddy = PolyBind.optionBinder(
+        binder,
+        Key.get(IntermediaryDataManager.class)
+    );
+    intermediaryDataManagerBiddy.addBinding("local").to(LocalIntermediaryDataManager.class).in(LazySingleton.class);
+
+    PolyBind.createChoice(
+        binder,
+        "druid.processing.intermediaryData.storage.type",
+        Key.get(ShuffleClient.class),
+        Key.get(HttpShuffleClient.class)
+    );
+    final MapBinder<String, ShuffleClient> shuffleClientBiddy = PolyBind.optionBinder(
+        binder,
+        Key.get(ShuffleClient.class)
+    );
+    shuffleClientBiddy.addBinding("local").to(HttpShuffleClient.class).in(LazySingleton.class);
+  }
 }