Make SegmentLoader extensible and customizable (#11398)

This PR refactors the code related to segment loading specifically SegmentLoader and SegmentLoaderLocalCacheManager. SegmentLoader is marked UnstableAPI which means, it can be extended outside core druid in custom extensions. Here is a summary of changes

SegmentLoader returns an instance of ReferenceCountingSegment instead of Segment. Earlier, SegmentManager was wrapping Segment objects inside ReferenceCountingSegment. That is now moved to SegmentLoader. With this, a custom implementation can track the references of segments. It also allows them to create custom ReferenceCountingSegment implementations. For this reason, the constructor visibility in ReferenceCountingSegment is changed from private to protected.
SegmentCacheManager has two additional methods called - reserve(DataSegment) and release(DataSegment). These methods let the caller reserve or release space without calling SegmentLoader#getSegment. We already had similar methods in StorageLocation and now they are available in SegmentCacheManager too which wraps multiple locations.
Refactoring to simplify the code in SegmentCacheManager wherever possible. There is no change in the functionality.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 3886f3d..4e3e8b2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -606,6 +606,18 @@
           {
             throw new UnsupportedOperationException("unused");
           }
+
+          @Override
+          public boolean reserve(DataSegment segment)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean release(DataSegment segment)
+          {
+            throw new UnsupportedOperationException();
+          }
         },
         DataSegment.builder()
                    .dataSource("ds")
diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
index 5b53131..55e3904 100644
--- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
@@ -33,6 +33,9 @@
  * {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on
  * segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight
  * queries.
+ *
+ * Extensions can extend this class for populating {@link org.apache.druid.timeline.VersionedIntervalTimeline} with
+ * a custom implementation through SegmentLoader.
  */
 public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<Segment>
     implements SegmentReference, Overshadowable<ReferenceCountingSegment>
@@ -67,7 +70,7 @@
     );
   }
 
-  private ReferenceCountingSegment(
+  protected ReferenceCountingSegment(
       Segment baseSegment,
       int startRootPartitionId,
       int endRootPartitionId,
@@ -172,4 +175,13 @@
   {
     return incrementReferenceAndDecrementOnceCloseable();
   }
+
+  @Override
+  public <T> T as(Class<T> clazz)
+  {
+    if (isClosed()) {
+      return null;
+    }
+    return baseObject.as(clazz);
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
index 6566592..b1f65cf 100644
--- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment;
 
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.join.table.IndexedTable;
 import org.apache.druid.timeline.SegmentId;
 import org.easymock.EasyMock;
 import org.joda.time.Days;
@@ -43,6 +44,7 @@
   private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc());
   private QueryableIndex index;
   private StorageAdapter adapter;
+  private IndexedTable indexedTable;
   private int underlyingSegmentClosedCount;
 
   @Before
@@ -51,6 +53,7 @@
     underlyingSegmentClosedCount = 0;
     index = EasyMock.createNiceMock(QueryableIndex.class);
     adapter = EasyMock.createNiceMock(StorageAdapter.class);
+    indexedTable = EasyMock.createNiceMock(IndexedTable.class);
 
     segment = ReferenceCountingSegment.wrapRootGenerationSegment(
         new Segment()
@@ -80,6 +83,19 @@
           }
 
           @Override
+          public <T> T as(Class<T> clazz)
+          {
+            if (clazz.equals(QueryableIndex.class)) {
+              return (T) asQueryableIndex();
+            } else if (clazz.equals(StorageAdapter.class)) {
+              return (T) asStorageAdapter();
+            } else if (clazz.equals(IndexedTable.class)) {
+              return (T) indexedTable;
+            }
+            return null;
+          }
+
+          @Override
           public void close()
           {
             underlyingSegmentClosedCount++;
@@ -159,4 +175,13 @@
     Assert.assertEquals(adapter, segment.asStorageAdapter());
   }
 
+  @Test
+  public void testSegmentAs()
+  {
+    Assert.assertSame(index, segment.as(QueryableIndex.class));
+    Assert.assertSame(adapter, segment.as(StorageAdapter.class));
+    Assert.assertSame(indexedTable, segment.as(IndexedTable.class));
+    Assert.assertNull(segment.as(String.class));
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index 9458574..39ea785 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -30,18 +30,58 @@
 public interface SegmentCacheManager
 {
   /**
-   * Checks whether a segment is already cached.
+   * Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)}
+   * has been successful for a segment but is not downloaded yet.
    */
   boolean isSegmentCached(DataSegment segment);
 
   /**
-   * This method fetches the files for the given segment if the segment is not downloaded already.
+   * This method fetches the files for the given segment if the segment is not downloaded already. It
+   * is not required to {@link #reserve(DataSegment)} before calling this method. If caller has not reserved
+   * the space explicitly via {@link #reserve(DataSegment)}, the implementation should reserve space on caller's
+   * behalf.
+   * If the space has been explicitly reserved already
+   *    - implementation should use only the reserved space to store segment files.
+   *    - implementation should not release the location in case of download erros and leave it to the caller.
    * @throws SegmentLoadingException if there is an error in downloading files
    */
   File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
 
   /**
-   * Cleanup the cache space used by the segment
+   * Tries to reserve the space for a segment on any location. When the space has been reserved,
+   * {@link #getSegmentFiles(DataSegment)} should download the segment on the reserved location or
+   * fail otherwise.
+   *
+   * This function is useful for custom extensions. Extensions can try to reserve the space first and
+   * if not successful, make some space by cleaning up other segments, etc. There is also improved
+   * concurrency for extensions with this function. Since reserve is a cheaper operation to invoke
+   * till the space has been reserved. Hence it can be put inside a lock if required by the extensions. getSegment
+   * can't be put inside a lock since it is a time-consuming operation, on account of downloading the files.
+   *
+   * @param segment - Segment to reserve
+   * @return True if enough space found to store the segment, false otherwise
+   */
+  /*
+   * We only return a boolean result instead of a pointer to
+   * {@link StorageLocation} since we don't want callers to operate on {@code StorageLocation} directly outside {@code SegmentLoader}.
+   * {@link SegmentLoader} operates on the {@code StorageLocation} objects in a thread-safe manner.
+   */
+  boolean reserve(DataSegment segment);
+
+  /**
+   * Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by releasing the location reserved for this segment.
+   * Callers, that explicitly reserve the space via {@link #reserve(DataSegment)}, should use this method to release the space.
+   *
+   * Implementation can throw error if the space is being released but there is data present. Callers
+   * are supposed to ensure that any data is removed via {@link #cleanup(DataSegment)}
+   * @param segment - Segment to release the location for.
+   * @return - True if any location was reserved and released, false otherwise.
+   */
+  boolean release(DataSegment segment);
+
+  /**
+   * Cleanup the cache space used by the segment. It will not release the space if the space has been
+   * explicitly reserved via {@link #reserve(DataSegment)}
    */
   void cleanup(DataSegment segment);
 }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 8fe38a3..03bc050 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -19,7 +19,8 @@
 
 package org.apache.druid.segment.loading;
 
-import org.apache.druid.segment.Segment;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
@@ -27,16 +28,25 @@
  * Loading segments from deep storage to local storage. Internally, this class can delegate the download to
  * {@link SegmentCacheManager}. Implementations must be thread-safe.
  */
+@UnstableApi
 public interface SegmentLoader
 {
+
   /**
-   * Builds a {@link Segment} by downloading if necessary
+   * Returns a {@link ReferenceCountingSegment} that will be added by the {@link org.apache.druid.server.SegmentManager}
+   * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This method can be called multiple times
+   * by the {@link org.apache.druid.server.SegmentManager} and implementation can either return same {@link ReferenceCountingSegment}
+   * or a different {@link ReferenceCountingSegment}. Caller should not assume any particular behavior.
+   *
+   * Returning a {@code ReferenceCountingSegment} will let custom implementations keep track of reference count for
+   * segments that the custom implementations are creating. That way, custom implementations can know when the segment
+   * is in use or not.
    * @param segment - Segment to load
    * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading
    * @param loadFailed - Callback to invoke if lazy loading fails during column access.
    * @throws SegmentLoadingException - If there is an error in loading the segment
    */
-  Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
+  ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
 
   /**
    * cleanup any state used by this segment
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
index 6970f7b..7fbf425 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
@@ -46,7 +47,7 @@
   }
 
   @Override
-  public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
+  public ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
   {
     final File segmentFiles = cacheManager.getSegmentFiles(segment);
     File factoryJson = new File(segmentFiles, "factory.json");
@@ -63,7 +64,8 @@
       factory = new MMappedQueryableSegmentizerFactory(indexIO);
     }
 
-    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
+    Segment segmentObject = factory.factorize(segment, segmentFiles, lazy, loadFailed);
+    return ReferenceCountingSegment.wrapSegment(segmentObject, segment.getShardSpec());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 412cfe9..25c9cae 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -30,6 +30,7 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
@@ -37,6 +38,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
+ *
  */
 public class SegmentLocalCacheManager implements SegmentCacheManager
 {
@@ -110,6 +112,7 @@
    *
    * This ctor is mainly for test cases, including test cases in other modules
    */
+  @VisibleForTesting
   public SegmentLocalCacheManager(
       SegmentLoaderConfig config,
       @Json ObjectMapper mapper
@@ -122,25 +125,45 @@
     log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
   }
 
+
+  static String getSegmentDir(DataSegment segment)
+  {
+    return DataSegmentPusher.getDefaultStorageDir(segment, false);
+  }
+
   @Override
   public boolean isSegmentCached(final DataSegment segment)
   {
-    return findStorageLocationIfLoaded(segment) != null;
+    return findStoragePathIfCached(segment) != null;
   }
 
+  /**
+   * This method will try to find if the segment is already downloaded on any location. If so, the segment path
+   * is returned. Along with that, location state is also updated with the segment location. Refer to
+   * {@link StorageLocation#maybeReserve(String, DataSegment)} for more details.
+   * If the segment files are damaged in any location, they are removed from the location.
+   * @param segment - Segment to check
+   * @return - Path corresponding to segment directory if found, null otherwise.
+   */
   @Nullable
-  private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
+  private File findStoragePathIfCached(final DataSegment segment)
   {
     for (StorageLocation location : locations) {
-      File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
+      String storageDir = getSegmentDir(segment);
+      File localStorageDir = location.segmentDirectoryAsFile(storageDir);
       if (localStorageDir.exists()) {
         if (checkSegmentFilesIntact(localStorageDir)) {
-          log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath());
+          log.warn(
+              "[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.",
+              localStorageDir.getAbsolutePath()
+          );
           cleanupCacheFiles(location.getPath(), localStorageDir);
           location.removeSegmentDir(localStorageDir, segment);
           break;
         } else {
-          return location;
+          // Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details.
+          location.maybeReserve(storageDir, segment);
+          return localStorageDir;
         }
       }
     }
@@ -180,16 +203,12 @@
     final ReferenceCountingLock lock = createOrGetLock(segment);
     synchronized (lock) {
       try {
-        StorageLocation loc = findStorageLocationIfLoaded(segment);
-        String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
-
-        if (loc == null) {
-          loc = loadSegmentWithRetry(segment, storageDir);
-        } else {
-          // If the segment is already downloaded on disk, we just update the current usage
-          loc.maybeReserve(storageDir, segment);
+        File segmentDir = findStoragePathIfCached(segment);
+        if (segmentDir != null) {
+          return segmentDir;
         }
-        return new File(loc.getPath(), storageDir);
+
+        return loadSegmentWithRetry(segment);
       }
       finally {
         unlock(segment, lock);
@@ -198,44 +217,83 @@
   }
 
   /**
-   * location may fail because of IO failure, most likely in two cases:<p>
+   * If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location
+   * should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all
+   * of them one by one till there is success.
+   * Location may fail because of IO failure, most likely in two cases:<p>
    * 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
    * 2. disk failure, druid can't read/write to this disk anymore
-   *
+   * <p>
    * Locations are fetched using {@link StorageLocationSelectorStrategy}.
    */
-  private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
+  private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException
   {
-    Iterator<StorageLocation> locationsIterator = strategy.getLocations();
+    String segmentDir = getSegmentDir(segment);
 
+    // Try the already reserved location. If location has been reserved outside, then we do not release the location
+    // here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else.
+    for (StorageLocation loc : locations) {
+      if (loc.isReserved(segmentDir)) {
+        File storageDir = loc.segmentDirectoryAsFile(segmentDir);
+        boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false);
+        if (!success) {
+          throw new SegmentLoadingException("Failed to load segment %s in reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath());
+        }
+        return storageDir;
+      }
+    }
+
+    // No location was reserved so we try all the locations
+    Iterator<StorageLocation> locationsIterator = strategy.getLocations();
     while (locationsIterator.hasNext()) {
 
       StorageLocation loc = locationsIterator.next();
 
-      File storageDir = loc.reserve(storageDirStr, segment);
+      // storageDir is the file path corresponding to segment dir
+      File storageDir = loc.reserve(segmentDir, segment);
       if (storageDir != null) {
-        try {
-          loadInLocationWithStartMarker(segment, storageDir);
-          return loc;
-        }
-        catch (SegmentLoadingException e) {
-          try {
-            log.makeAlert(
-                e,
-                "Failed to load segment in current location [%s], try next location if any",
-                loc.getPath().getAbsolutePath()
-            ).addData("location", loc.getPath().getAbsolutePath()).emit();
-          }
-          finally {
-            loc.removeSegmentDir(storageDir, segment);
-            cleanupCacheFiles(loc.getPath(), storageDir);
-          }
+        boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true);
+        if (success) {
+          return storageDir;
         }
       }
     }
     throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId());
   }
 
+  /**
+   * A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException}
+   * and emits alerts.
+   * @param loc - {@link StorageLocation} where segment is to be downloaded in.
+   * @param segment - {@link DataSegment} to download
+   * @param storageDir - {@link File} pointing to segment directory
+   * @param releaseLocation - Whether to release the location in case of failures
+   * @return - True if segment was downloaded successfully, false otherwise.
+   */
+  private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation)
+  {
+    try {
+      loadInLocationWithStartMarker(segment, storageDir);
+      return true;
+    }
+    catch (SegmentLoadingException e) {
+      try {
+        log.makeAlert(
+            e,
+            "Failed to load segment in current location [%s], try next location if any",
+            loc.getPath().getAbsolutePath()
+        ).addData("location", loc.getPath().getAbsolutePath()).emit();
+      }
+      finally {
+        if (releaseLocation) {
+          loc.removeSegmentDir(storageDir, segment);
+        }
+        cleanupCacheFiles(loc.getPath(), storageDir);
+      }
+    }
+    return false;
+  }
+
   private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
   {
     // We use a marker to prevent the case where a segment is downloaded, but before the download completes,
@@ -278,6 +336,73 @@
   }
 
   @Override
+  public boolean reserve(final DataSegment segment)
+  {
+    final ReferenceCountingLock lock = createOrGetLock(segment);
+    synchronized (lock) {
+      try {
+        // May be the segment was already loaded [This check is required to account for restart scenarios]
+        if (null != findStoragePathIfCached(segment)) {
+          return true;
+        }
+
+        String storageDirStr = getSegmentDir(segment);
+
+        // check if we already reserved the segment
+        for (StorageLocation location : locations) {
+          if (location.isReserved(storageDirStr)) {
+            return true;
+          }
+        }
+
+        // Not found in any location, reserve now
+        for (Iterator<StorageLocation> it = strategy.getLocations(); it.hasNext(); ) {
+          StorageLocation location = it.next();
+          if (null != location.reserve(storageDirStr, segment)) {
+            return true;
+          }
+        }
+      }
+      finally {
+        unlock(segment, lock);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean release(final DataSegment segment)
+  {
+    final ReferenceCountingLock lock = createOrGetLock(segment);
+    synchronized (lock) {
+      try {
+        String storageDir = getSegmentDir(segment);
+
+        // Release the first location encountered
+        for (StorageLocation location : locations) {
+          if (location.isReserved(storageDir)) {
+            File localStorageDir = location.segmentDirectoryAsFile(storageDir);
+            if (localStorageDir.exists()) {
+              throw new ISE(
+                  "Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing",
+                  location.getPath().getAbsolutePath(),
+                  localStorageDir.getAbsolutePath()
+              );
+            }
+            return location.release(storageDir, segment.getSize());
+          }
+        }
+      }
+      finally {
+        unlock(segment, lock);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
   public void cleanup(DataSegment segment)
   {
     if (!config.isDeleteOnRemove()) {
@@ -287,18 +412,17 @@
     final ReferenceCountingLock lock = createOrGetLock(segment);
     synchronized (lock) {
       try {
-        StorageLocation loc = findStorageLocationIfLoaded(segment);
+        File loc = findStoragePathIfCached(segment);
 
         if (loc == null) {
           log.warn("Asked to cleanup something[%s] that didn't exist.  Skipping.", segment.getId());
           return;
         }
-
         // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
         // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
         // So we should always clean all possible locations here
         for (StorageLocation location : locations) {
-          File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
+          File localStorageDir = new File(location.getPath(), getSegmentDir(segment));
           if (localStorageDir.exists()) {
             // Druid creates folders of the form dataSource/interval/version/partitionNum.
             // We need to clean up all these directories if they are all empty.
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index efaefe9..60f1831 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -26,6 +26,7 @@
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.HashSet;
 import java.util.Set;
@@ -117,6 +118,16 @@
     return reserve(segmentDir, segment.getId().toString(), segment.getSize());
   }
 
+  public synchronized boolean isReserved(String segmentDir)
+  {
+    return files.contains(segmentDirectoryAsFile(segmentDir));
+  }
+
+  public File segmentDirectoryAsFile(String segmentDir)
+  {
+    return new File(path, segmentDir);  //lgtm [java/path-injection]
+  }
+
   /**
    * Reserves space to store the given segment, only if it has not been done already. This can be used
    * when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 486ad46..dcd5a1d 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -30,7 +30,6 @@
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.ReferenceCountingSegment;
-import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.join.table.IndexedTable;
 import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
@@ -217,7 +216,7 @@
    */
   public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
   {
-    final Segment adapter = getAdapter(segment, lazy, loadFailed);
+    final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed);
 
     final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
 
@@ -252,9 +251,7 @@
             loadedIntervals.add(
                 segment.getInterval(),
                 segment.getVersion(),
-                segment.getShardSpec().createChunk(
-                    ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
-                )
+                segment.getShardSpec().createChunk(adapter)
             );
             dataSourceState.addSegment(segment);
             resultSupplier.set(true);
@@ -268,21 +265,21 @@
     return resultSupplier.get();
   }
 
-  private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
+  private ReferenceCountingSegment getSegmentReference(final DataSegment dataSegment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
   {
-    final Segment adapter;
+    final ReferenceCountingSegment segment;
     try {
-      adapter = segmentLoader.getSegment(segment, lazy, loadFailed);
+      segment = segmentLoader.getSegment(dataSegment, lazy, loadFailed);
     }
     catch (SegmentLoadingException e) {
-      segmentLoader.cleanup(segment);
+      segmentLoader.cleanup(dataSegment);
       throw e;
     }
 
-    if (adapter == null) {
-      throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
+    if (segment == null) {
+      throw new SegmentLoadingException("Null adapter from loadSpec[%s]", dataSegment.getLoadSpec());
     }
-    return adapter;
+    return segment;
   }
 
   public void dropSegment(final DataSegment segment)
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
index a268681..ca314b9 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
@@ -48,6 +48,18 @@
   }
 
   @Override
+  public boolean reserve(DataSegment segment)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean release(DataSegment segment)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void cleanup(DataSegment segment)
   {
     segmentsInTrash.add(segment);
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index cf47755..831a1a4 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.loading;
 
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.StorageAdapter;
@@ -33,9 +34,9 @@
 {
 
   @Override
-  public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
+  public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
   {
-    return new Segment()
+    Segment baseSegment = new Segment()
     {
       @Override
       public SegmentId getId()
@@ -66,6 +67,7 @@
       {
       }
     };
+    return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec());
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 26c9cbd..5d116d1 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -38,6 +38,7 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class SegmentLocalCacheManagerTest
@@ -762,4 +763,151 @@
     Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload));
     Assert.assertFalse(cachedSegmentDir.exists());
   }
+
+  @Test
+  public void testReserveSegment()
+  {
+    final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
+    final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 200L, 0.0d);
+    final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
+
+    manager = new SegmentLocalCacheManager(
+        Arrays.asList(secondLocation, firstLocation),
+        new SegmentLoaderConfig(),
+        new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
+        jsonMapper
+    );
+    Assert.assertTrue(manager.reserve(dataSegment));
+    Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+    Assert.assertEquals(100L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(150L, secondLocation.availableSizeBytes());
+
+    // Reserving again should be no-op
+    Assert.assertTrue(manager.reserve(dataSegment));
+    Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+    Assert.assertEquals(100L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(150L, secondLocation.availableSizeBytes());
+
+    // Reserving a second segment should now go to a different location
+    final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L);
+    Assert.assertTrue(manager.reserve(otherSegment));
+    Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+    Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false)));
+    Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false)));
+    Assert.assertEquals(100L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(50L, secondLocation.availableSizeBytes());
+  }
+
+  @Test
+  public void testReserveNotEnoughSpace()
+  {
+    final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
+    final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d);
+    final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
+
+    manager = new SegmentLocalCacheManager(
+        Arrays.asList(secondLocation, firstLocation),
+        new SegmentLoaderConfig(),
+        new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
+        jsonMapper
+    );
+
+    // should go to second location if first one doesn't have enough space
+    Assert.assertTrue(manager.reserve(dataSegment));
+    Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+    Assert.assertEquals(50L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(50L, secondLocation.availableSizeBytes());
+
+    final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L);
+    Assert.assertFalse(manager.reserve(otherSegment));
+    Assert.assertEquals(50L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(50L, secondLocation.availableSizeBytes());
+  }
+
+  @Test
+  public void testSegmentDownloadWhenLocationReserved() throws Exception
+  {
+    final List<StorageLocationConfig> locationConfigs = new ArrayList<>();
+    final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true);
+    final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true);
+    final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true);
+    locationConfigs.add(locationConfig);
+    locationConfigs.add(locationConfig2);
+    locationConfigs.add(locationConfig3);
+
+    List<StorageLocation> locations = new ArrayList<>();
+    for (StorageLocationConfig locConfig : locationConfigs) {
+      locations.add(
+          new StorageLocation(
+              locConfig.getPath(),
+              locConfig.getMaxSize(),
+              locConfig.getFreeSpacePercent()
+          )
+      );
+    }
+
+    manager = new SegmentLocalCacheManager(
+        new SegmentLoaderConfig().withLocations(locationConfigs),
+        new RoundRobinStorageLocationSelectorStrategy(locations),
+        jsonMapper
+    );
+
+    StorageLocation location3 = manager.getLocations().get(2);
+    Assert.assertEquals(locationConfig3.getPath(), location3.getPath());
+    final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
+
+    // Segment should be downloaded in local_storage_folder3 even if that is the third location
+    final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
+        ImmutableMap.of(
+            "type",
+            "local",
+            "path",
+            segmentSrcFolder.getCanonicalPath()
+                + "/test_segment_loader"
+                + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+                + "/0/index.zip"
+        )
+    );
+    String segmentDir = DataSegmentPusher.getDefaultStorageDir(segmentToDownload, false);
+    location3.reserve(segmentDir, segmentToDownload);
+    // manually create a local segment under segmentSrcFolder
+    createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
+
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
+
+    File segmentFile = manager.getSegmentFiles(segmentToDownload);
+    Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder3/"));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
+
+    manager.cleanup(segmentToDownload);
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
+    Assert.assertFalse(location3.isReserved(segmentDir));
+  }
+
+  @Test
+  public void testRelease()
+  {
+    final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L);
+    final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d);
+    final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d);
+
+    manager = new SegmentLocalCacheManager(
+        Arrays.asList(secondLocation, firstLocation),
+        new SegmentLoaderConfig(),
+        new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)),
+        jsonMapper
+    );
+
+    manager.reserve(dataSegment);
+    manager.release(dataSegment);
+    Assert.assertEquals(50L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(150L, secondLocation.availableSizeBytes());
+    Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+    Assert.assertFalse(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false)));
+
+    // calling release again should have no effect
+    manager.release(dataSegment);
+    Assert.assertEquals(50L, firstLocation.availableSizeBytes());
+    Assert.assertEquals(150L, secondLocation.availableSizeBytes());
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 8698146..83f0a8f 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -64,12 +64,12 @@
   private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader()
   {
     @Override
-    public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
+    public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
     {
-      return new SegmentForTesting(
+      return ReferenceCountingSegment.wrapSegment(new SegmentForTesting(
           MapUtils.getString(segment.getLoadSpec(), "version"),
           (Interval) segment.getLoadSpec().get("interval")
-      );
+      ), segment.getShardSpec());
     }
 
     @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 16832b3..a76cc5b 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -148,12 +148,12 @@
         new SegmentLoader()
         {
           @Override
-          public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
+          public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
           {
-            return new SegmentForTesting(
+            return ReferenceCountingSegment.wrapSegment(new SegmentForTesting(
                 MapUtils.getString(segment.getLoadSpec(), "version"),
                 (Interval) segment.getLoadSpec().get("interval")
-            );
+            ), segment.getShardSpec());
           }
 
           @Override