diff --git a/docs/development/modules.md b/docs/development/modules.md
index d1080b6..0110288 100644
--- a/docs/development/modules.md
+++ b/docs/development/modules.md
@@ -137,7 +137,7 @@
 ip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
 2015-04-14T02:49:08,276 INFO [ZkCoordinator-0] org.apache.druid.storage.azure.AzureDataSegmentPuller - Loaded 1196 bytes from [dde/2015-01-02T00:00:00.000Z_2015-01-03
 T00:00:00.000Z/2015-04-14T02:41:09.484Z/0/index.zip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
-2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
+2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
 2015-04-14T02:49:08,282 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] at path[/druid/dev/segments/192.168.33.104:8081/192.168.33.104:8081_historical__default_tier_2015-04-14T02:49:08.282Z_7bb87230ebf940188511dd4a53ffd7351]
 2015-04-14T02:49:08,292 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.ZkCoordinator - Completed request [LOAD: dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z]
 ```
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index ca3630d..5424288 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -60,7 +60,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -2887,7 +2887,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 5b3a77c..3d0f86d 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -48,7 +48,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -2974,7 +2974,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
similarity index 77%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
index 17b8dc1..6672cf0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
@@ -22,10 +22,9 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 
 import java.io.File;
@@ -34,25 +33,21 @@
 /**
  *
  */
-public class SegmentLoaderFactory
+public class SegmentCacheManagerFactory
 {
-  private final IndexIO indexIO;
   private final ObjectMapper jsonMapper;
 
   @Inject
-  public SegmentLoaderFactory(
-      IndexIO indexIO,
+  public SegmentCacheManagerFactory(
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.jsonMapper = mapper;
   }
 
-  public SegmentLoader manufacturate(File storageDir)
+  public SegmentCacheManager manufacturate(File storageDir)
   {
-    return new SegmentLoaderLocalCacheManager(
-        indexIO,
+    return new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(
             Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
         jsonMapper
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index dbaec5a..979de5f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -57,7 +57,7 @@
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -101,7 +101,7 @@
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
   private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
-  private final SegmentLoader segmentLoader;
+  private final SegmentCacheManager segmentCacheManager;
   private final ObjectMapper jsonMapper;
   private final File taskWorkDir;
   private final IndexIO indexIO;
@@ -144,7 +144,7 @@
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       @Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
-      SegmentLoader segmentLoader,
+      SegmentCacheManager segmentCacheManager,
       ObjectMapper jsonMapper,
       File taskWorkDir,
       IndexIO indexIO,
@@ -183,7 +183,7 @@
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
-    this.segmentLoader = segmentLoader;
+    this.segmentCacheManager = segmentCacheManager;
     this.jsonMapper = jsonMapper;
     this.taskWorkDir = taskWorkDir;
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@@ -318,7 +318,7 @@
   {
     Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
     for (DataSegment segment : segments) {
-      retVal.put(segment, segmentLoader.getSegmentFiles(segment));
+      retVal.put(segment, segmentCacheManager.getSegmentFiles(segment));
     }
 
     return retVal;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 5cd4eb5..cb1d3c1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -83,7 +83,7 @@
   private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final ObjectMapper jsonMapper;
   private final IndexIO indexIO;
   private final Cache cache;
@@ -124,7 +124,7 @@
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Provider<MonitorScheduler> monitorSchedulerProvider,
-      SegmentLoaderFactory segmentLoaderFactory,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
       @Json ObjectMapper jsonMapper,
       IndexIO indexIO,
       Cache cache,
@@ -162,7 +162,7 @@
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
-    this.segmentLoaderFactory = segmentLoaderFactory;
+    this.segmentCacheManagerFactory = segmentCacheManagerFactory;
     this.jsonMapper = jsonMapper;
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
@@ -204,7 +204,7 @@
         queryProcessingPool,
         joinableFactory,
         monitorSchedulerProvider,
-        segmentLoaderFactory.manufacturate(taskWorkDir),
+        segmentCacheManagerFactory.manufacturate(taskWorkDir),
         jsonMapper,
         taskWorkDir,
         indexIO,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 879949d..3374e8c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -52,7 +52,7 @@
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -157,7 +157,7 @@
   private final PartitionConfigurationManager partitionConfigurationManager;
 
   @JsonIgnore
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
 
   @JsonIgnore
   private final RetryPolicyFactory retryPolicyFactory;
@@ -185,7 +185,7 @@
       @JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
       @JsonProperty("context") @Nullable final Map<String, Object> context,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
@@ -233,7 +233,7 @@
     this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
     this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
     this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
-    this.segmentLoaderFactory = segmentLoaderFactory;
+    this.segmentCacheManagerFactory = segmentCacheManagerFactory;
     this.retryPolicyFactory = retryPolicyFactory;
   }
 
@@ -422,7 +422,7 @@
         metricsSpec,
         granularitySpec,
         toolbox.getCoordinatorClient(),
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory,
         ioConfig.isDropExisting()
     );
@@ -521,7 +521,7 @@
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       final CoordinatorClient coordinatorClient,
-      final SegmentLoaderFactory segmentLoaderFactory,
+      final SegmentCacheManagerFactory segmentCacheManagerFactory,
       final RetryPolicyFactory retryPolicyFactory,
       final boolean dropExisting
   ) throws IOException, SegmentLoadingException
@@ -604,7 +604,7 @@
                     dataSchema,
                     interval,
                     coordinatorClient,
-                    segmentLoaderFactory,
+                    segmentCacheManagerFactory,
                     retryPolicyFactory,
                     dropExisting
                 ),
@@ -632,7 +632,7 @@
                   dataSchema,
                   segmentProvider.interval,
                   coordinatorClient,
-                  segmentLoaderFactory,
+                  segmentCacheManagerFactory,
                   retryPolicyFactory,
                   dropExisting
               ),
@@ -647,7 +647,7 @@
       DataSchema dataSchema,
       Interval interval,
       CoordinatorClient coordinatorClient,
-      SegmentLoaderFactory segmentLoaderFactory,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
       RetryPolicyFactory retryPolicyFactory,
       boolean dropExisting
   )
@@ -663,7 +663,7 @@
             null,
             toolbox.getIndexIO(),
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             toolbox.getConfig()
         ),
@@ -1016,7 +1016,7 @@
   public static class Builder
   {
     private final String dataSource;
-    private final SegmentLoaderFactory segmentLoaderFactory;
+    private final SegmentCacheManagerFactory segmentCacheManagerFactory;
     private final RetryPolicyFactory retryPolicyFactory;
 
     private CompactionIOConfig ioConfig;
@@ -1035,12 +1035,12 @@
 
     public Builder(
         String dataSource,
-        SegmentLoaderFactory segmentLoaderFactory,
+        SegmentCacheManagerFactory segmentCacheManagerFactory,
         RetryPolicyFactory retryPolicyFactory
     )
     {
       this.dataSource = dataSource;
-      this.segmentLoaderFactory = segmentLoaderFactory;
+      this.segmentCacheManagerFactory = segmentCacheManagerFactory;
       this.retryPolicyFactory = retryPolicyFactory;
     }
 
@@ -1118,7 +1118,7 @@
           granularitySpec,
           tuningConfig,
           context,
-          segmentLoaderFactory,
+          segmentCacheManagerFactory,
           retryPolicyFactory
       );
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 01b3d95..f20a0dd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -36,7 +36,7 @@
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.indexing.common.ReingestionTimelineUtils;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IAE;
@@ -45,7 +45,7 @@
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -84,7 +84,7 @@
   private final Long maxInputSegmentBytesPerTask;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final RetryPolicyFactory retryPolicyFactory;
 
   private List<InputSplit<List<WindowedSegmentId>>> splits;
@@ -102,7 +102,7 @@
       @JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask,
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
@@ -119,7 +119,7 @@
     this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
-    this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
+    this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
   }
 
@@ -136,7 +136,7 @@
         maxInputSegmentBytesPerTask,
         indexIO,
         coordinatorClient,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory
     );
   }
@@ -202,7 +202,7 @@
     // Note: this requires enough local storage space to fit all of the segments, even though
     // IngestSegmentFirehose iterates over the segments in series. We may want to change this
     // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
     Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
     for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
       for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
@@ -210,7 +210,7 @@
 
         segmentFileMap.computeIfAbsent(segment, k -> {
           try {
-            return segmentLoader.getSegmentFiles(segment);
+            return segmentCacheManager.getSegmentFiles(segment);
           }
           catch (SegmentLoadingException e) {
             throw new RuntimeException(e);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index c9d0f4e..9be3378 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -42,7 +42,7 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.RetryPolicy;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.IAE;
@@ -52,7 +52,7 @@
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnHolder;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -129,7 +129,7 @@
   private final DimFilter dimFilter;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final RetryPolicyFactory retryPolicyFactory;
   private final TaskConfig taskConfig;
 
@@ -155,7 +155,7 @@
       @Nullable @JsonProperty("metrics") List<String> metrics,
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory,
       @JacksonInject TaskConfig taskConfig
   )
@@ -172,7 +172,7 @@
     this.metrics = metrics;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
-    this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
+    this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
     this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
   }
@@ -224,7 +224,7 @@
   @Override
   protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
   {
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
 
     final List<TimelineObjectHolder<String, DataSegment>> timeline = createTimeline();
     final Iterator<DruidSegmentInputEntity> entityIterator = FluentIterable
@@ -235,7 +235,7 @@
           //noinspection ConstantConditions
           return FluentIterable
               .from(partitionHolder)
-              .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
+              .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
         }).iterator();
 
     final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
@@ -339,7 +339,7 @@
         metrics,
         indexIO,
         coordinatorClient,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory,
         taskConfig
     );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
index 7550863..0548208 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
@@ -23,7 +23,7 @@
 import com.google.common.base.Predicates;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -37,13 +37,13 @@
 {
   private static final EmittingLogger log = new EmittingLogger(DruidSegmentInputEntity.class);
 
-  private final SegmentLoader segmentLoader;
+  private final SegmentCacheManager segmentCacheManager;
   private final DataSegment segment;
   private final Interval intervalFilter;
 
-  DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter)
+  DruidSegmentInputEntity(SegmentCacheManager segmentCacheManager, DataSegment segment, Interval intervalFilter)
   {
-    this.segmentLoader = segmentLoader;
+    this.segmentCacheManager = segmentCacheManager;
     this.segment = segment;
     this.intervalFilter = intervalFilter;
   }
@@ -71,7 +71,7 @@
   {
     final File segmentFile;
     try {
-      segmentFile = segmentLoader.getSegmentFiles(segment);
+      segmentFile = segmentCacheManager.getSegmentFiles(segment);
     }
     catch (SegmentLoadingException e) {
       throw new RuntimeException(e);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index ac0685a..3f75ce8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -44,8 +44,8 @@
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@@ -83,8 +83,8 @@
   private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
   private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class);
   private ObjectMapper ObjectMapper = new ObjectMapper();
-  private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
-  private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
+  private SegmentCacheManagerFactory mockSegmentCacheManagerFactory = EasyMock.createMock(SegmentCacheManagerFactory.class);
+  private SegmentLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLocalCacheManager.class);
   private Task task = EasyMock.createMock(Task.class);
   private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
   private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
@@ -129,7 +129,7 @@
         mockQueryProcessingPool,
         NoopJoinableFactory.INSTANCE,
         () -> mockMonitorScheduler,
-        mockSegmentLoaderFactory,
+        mockSegmentCacheManagerFactory,
         ObjectMapper,
         mockIndexIO,
         mockCache,
@@ -194,12 +194,12 @@
   {
     File expectedFile = temporaryFolder.newFile();
     EasyMock
-        .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject()))
+        .expect(mockSegmentCacheManagerFactory.manufacturate(EasyMock.anyObject()))
         .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
     EasyMock
         .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject()))
         .andReturn(expectedFile).anyTimes();
-    EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager);
+    EasyMock.replay(mockSegmentCacheManagerFactory, mockSegmentLoaderLocalCacheManager);
     DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build();
     List<DataSegment> segments = ImmutableList.of
         (
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index d7c5e32..0c6b1c4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -51,7 +51,7 @@
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -1590,7 +1590,7 @@
         DirectQueryProcessingPool.INSTANCE, // queryExecutorService
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 04b1b00..fd7b699 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -40,7 +40,7 @@
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -215,7 +215,7 @@
     final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
     final CompactionTask.Builder builder = new CompactionTask.Builder(
         "datasource",
-        new SegmentLoaderFactory(null, mapper),
+        new SegmentCacheManagerFactory(mapper),
         new RetryPolicyFactory(new RetryPolicyConfig())
     );
     final CompactionTask task = builder
@@ -338,7 +338,7 @@
                   binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                   binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
                   binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
-                  binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper));
                   binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
                   binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient());
                 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index e8b6b12..1732b3b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -139,7 +139,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -182,7 +182,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -222,7 +222,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -262,7 +262,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -300,7 +300,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -328,7 +328,7 @@
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -373,7 +373,7 @@
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -456,7 +456,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -490,7 +490,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index a958ee6..b6098ea 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -41,7 +41,7 @@
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -75,9 +75,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -160,7 +160,7 @@
   private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
   private final IndexingServiceClient indexingServiceClient;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final LockGranularity lockGranularity;
   private final TestUtils testUtils;
 
@@ -182,7 +182,7 @@
         return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
       }
     };
-    segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
     this.lockGranularity = lockGranularity;
   }
 
@@ -230,7 +230,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -279,7 +279,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -368,7 +368,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -446,7 +446,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -543,7 +543,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -598,7 +598,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -653,7 +653,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -698,7 +698,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -731,7 +731,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -775,7 +775,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -837,7 +837,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -925,7 +925,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -994,7 +994,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1046,7 +1046,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1154,7 +1154,7 @@
                     null,
                     getIndexIO(),
                     coordinatorClient,
-                    segmentLoaderFactory,
+                    segmentCacheManagerFactory,
                     RETRY_POLICY_FACTORY
                 ),
                 false,
@@ -1285,8 +1285,7 @@
 
   private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException
   {
-    final SegmentLoader loader = new SegmentLoaderLocalCacheManager(
-        getIndexIO(),
+    final SegmentCacheManager loader = new SegmentLocalCacheManager(
         new SegmentLoaderConfig() {
           @Override
           public List<StorageLocationConfig> getLocations()
@@ -1342,11 +1341,11 @@
   {
 
     final File cacheDir = temporaryFolder.newFolder();
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(cacheDir);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
 
     List<Cursor> cursors = new ArrayList<>();
     for (DataSegment segment : segments) {
-      final File segmentFile = segmentLoader.getSegmentFiles(segment);
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
       final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
           new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 330ccbf..ac9e3d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -57,7 +57,7 @@
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -188,7 +188,7 @@
   private static List<DataSegment> SEGMENTS;
 
   private TaskToolbox toolbox;
-  private SegmentLoaderFactory segmentLoaderFactory;
+  private SegmentCacheManagerFactory segmentCacheManagerFactory;
 
   @BeforeClass
   public static void setupClass()
@@ -277,7 +277,7 @@
                   binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                   binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory());
                   binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
-                  binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper));
                   binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
                   binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT);
                 }
@@ -361,7 +361,7 @@
         testIndexIO,
         SEGMENT_MAP
     );
-    segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
   }
 
   @Test
@@ -369,7 +369,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -379,7 +379,7 @@
 
     final Builder builder2 = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -397,7 +397,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -426,7 +426,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -455,7 +455,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -471,7 +471,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     final CompactionTask task = builder
@@ -492,7 +492,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     final CompactionTask task = builder
@@ -511,7 +511,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -586,14 +586,14 @@
         toolbox.getChatHandlerProvider(),
         toolbox.getRowIngestionMetersFactory(),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         toolbox.getAppenderatorsManager()
     );
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -790,7 +790,7 @@
             null,
             toolbox.getRowIngestionMetersFactory(),
             COORDINATOR_CLIENT,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             RETRY_POLICY_FACTORY,
             toolbox.getAppenderatorsManager()
         );
@@ -848,7 +848,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -921,7 +921,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -995,7 +995,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1069,7 +1069,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1133,7 +1133,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1177,7 +1177,7 @@
         customMetricsSpec,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1214,7 +1214,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1257,7 +1257,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1281,7 +1281,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1295,7 +1295,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1316,7 +1316,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1354,7 +1354,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1393,7 +1393,7 @@
             new PeriodGranularity(Period.months(3), null, null)
         ),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1431,7 +1431,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1468,7 +1468,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(null, null),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -2014,7 +2014,7 @@
         @JacksonInject ChatHandlerProvider chatHandlerProvider,
         @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
         @JacksonInject CoordinatorClient coordinatorClient,
-        @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+        @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
         @JacksonInject RetryPolicyFactory retryPolicyFactory,
         @JacksonInject AppenderatorsManager appenderatorsManager
     )
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 55c1f15..f805b2c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -80,9 +80,9 @@
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
@@ -172,7 +172,7 @@
   private final boolean useInputFormatApi;
 
   private AppenderatorsManager appenderatorsManager;
-  private SegmentLoader segmentLoader;
+  private SegmentCacheManager segmentCacheManager;
   private TestTaskRunner taskRunner;
 
   public IndexTaskTest(LockGranularity lockGranularity, boolean useInputFormatApi)
@@ -190,8 +190,7 @@
     appenderatorsManager = new TestAppenderatorsManager();
 
     final File cacheDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -345,7 +344,7 @@
 
     Assert.assertEquals(1, segments.size());
     DataSegment segment = segments.get(0);
-    final File segmentFile = segmentLoader.getSegmentFiles(segment);
+    final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
     final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
         new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
@@ -595,7 +594,7 @@
       final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
       Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction());
 
-      final File segmentFile = segmentLoader.getSegmentFiles(segment);
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
       final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
           new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index c42bf20..ab9b0d7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -25,7 +25,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -63,7 +63,7 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -95,7 +95,7 @@
 
   private final TestUtils testUtils = new TestUtils();
   private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
-  private SegmentLoaderFactory segmentLoaderFactory;
+  private SegmentCacheManagerFactory segmentCacheManagerFactory;
   private TaskStorage taskStorage;
   private IndexerSQLMetadataStorageCoordinator storageCoordinator;
   private SegmentsMetadataManager segmentsMetadataManager;
@@ -123,7 +123,7 @@
         derbyConnectorRule.getConnector()
     );
     lockbox = new TaskLockbox(taskStorage, storageCoordinator);
-    segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
   }
 
   @After
@@ -153,9 +153,9 @@
     lockbox.remove(task);
   }
 
-  public SegmentLoader newSegmentLoader(File storageDir)
+  public SegmentCacheManager newSegmentLoader(File storageDir)
   {
-    return segmentLoaderFactory.manufacturate(storageDir);
+    return segmentCacheManagerFactory.manufacturate(storageDir);
   }
 
   public ObjectMapper getObjectMapper()
@@ -168,9 +168,9 @@
     return taskStorage;
   }
 
-  public SegmentLoaderFactory getSegmentLoaderFactory()
+  public SegmentCacheManagerFactory getSegmentCacheManagerFactory()
   {
-    return segmentLoaderFactory;
+    return segmentCacheManagerFactory;
   }
 
   public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index f2b1503..94bf84d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -42,7 +42,7 @@
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestFirehose;
@@ -989,7 +989,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a716217..f79406b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -30,7 +30,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.input.DruidInputSource;
@@ -55,8 +55,10 @@
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -287,8 +289,9 @@
 
   private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
   {
-    final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
+    final SegmentCacheManager cacheManager = new SegmentCacheManagerFactory(getObjectMapper())
         .manufacturate(tempSegmentDir);
+    final SegmentLoader loader = new SegmentLocalCacheLoader(cacheManager, getIndexIO(), getObjectMapper());
     try {
       return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP);
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index a0d5449..d10013e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -49,7 +49,7 @@
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -614,7 +614,7 @@
             .addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER)
             .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
             .addValue(CoordinatorClient.class, coordinatorClient)
-            .addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper))
+            .addValue(SegmentCacheManagerFactory.class, new SegmentCacheManagerFactory(objectMapper))
             .addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
             .addValue(TaskConfig.class, taskConfig)
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index 7f001c2..c2021fcd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -227,7 +227,7 @@
   {
     return new Builder(
         DATASOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 34ae0ab..fa13c8e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -44,7 +44,7 @@
 import org.apache.druid.indexing.common.ReingestionTimelineUtils;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -226,7 +226,7 @@
     SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
     EasyMock.replay(notifierFactory);
 
-    final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+    final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
     final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
     Collection<Object[]> values = new ArrayList<>();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 1213f9e..d226dc2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -37,7 +37,7 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
@@ -318,7 +318,7 @@
     for (final TestCase testCase : testCases) {
       SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
       EasyMock.replay(notifierFactory);
-      final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+      final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
       final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
       final CoordinatorClient cc = new CoordinatorClient(null, null)
       {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
index dcdc537..ebc2b94 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
@@ -27,7 +27,7 @@
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.guice.IndexingServiceInputSourceModule;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.Intervals;
@@ -45,7 +45,7 @@
 {
   private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
   private final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-  private final SegmentLoaderFactory segmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory = EasyMock.createMock(SegmentCacheManagerFactory.class);
   private final RetryPolicyFactory retryPolicyFactory = EasyMock.createMock(RetryPolicyFactory.class);
   private final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
 
@@ -63,7 +63,7 @@
     final InjectableValues.Std injectableValues = (InjectableValues.Std) mapper.getInjectableValues();
     injectableValues.addValue(IndexIO.class, indexIO);
     injectableValues.addValue(CoordinatorClient.class, coordinatorClient);
-    injectableValues.addValue(SegmentLoaderFactory.class, segmentLoaderFactory);
+    injectableValues.addValue(SegmentCacheManagerFactory.class, segmentCacheManagerFactory);
     injectableValues.addValue(RetryPolicyFactory.class, retryPolicyFactory);
     injectableValues.addValue(TaskConfig.class, taskConfig);
   }
@@ -90,7 +90,7 @@
             null,
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
@@ -124,7 +124,7 @@
             ImmutableList.of("b"),
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
@@ -164,7 +164,7 @@
             null,
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 9270f5f..3886f3d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -46,12 +46,10 @@
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -589,16 +587,10 @@
   private DruidSegmentInputEntity makeInputEntity(final Interval interval)
   {
     return new DruidSegmentInputEntity(
-        new SegmentLoader()
+        new SegmentCacheManager()
         {
           @Override
-          public boolean isSegmentLoaded(DataSegment segment)
-          {
-            throw new UnsupportedOperationException("unused");
-          }
-
-          @Override
-          public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
+          public boolean isSegmentCached(DataSegment segment)
           {
             throw new UnsupportedOperationException("unused");
           }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index c05cdb8..f1ea2a0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -23,7 +23,7 @@
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
@@ -110,7 +110,7 @@
         null,
         NoopJoinableFactory.INSTANCE,
         null,
-        new SegmentLoaderFactory(null, utils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(utils.getTestObjectMapper()),
         utils.getTestObjectMapper(),
         utils.getTestIndexIO(),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 1903902..032c2fd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -54,7 +54,7 @@
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -665,7 +665,7 @@
         DirectQueryProcessingPool.INSTANCE, // query executor service
         NoopJoinableFactory.INSTANCE,
         () -> monitorScheduler, // monitor scheduler
-        new SegmentLoaderFactory(null, new DefaultObjectMapper()),
+        new SegmentCacheManagerFactory(new DefaultObjectMapper()),
         MAPPER,
         INDEX_IO,
         MapCache.create(0),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 2dd94e8..b7a489f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -26,7 +26,7 @@
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestTasks;
 import org.apache.druid.indexing.common.TestUtils;
@@ -117,7 +117,7 @@
                 null,
                 NoopJoinableFactory.INSTANCE,
                 null,
-                new SegmentLoaderFactory(null, jsonMapper),
+                new SegmentCacheManagerFactory(jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index c184509..dbc44f0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -31,7 +31,7 @@
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.IndexingServiceCondition;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestRealtimeTask;
 import org.apache.druid.indexing.common.TestTasks;
@@ -190,7 +190,7 @@
                 null,
                 NoopJoinableFactory.INSTANCE,
                 null,
-                new SegmentLoaderFactory(null, jsonMapper),
+                new SegmentCacheManagerFactory(jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
index 99e38e3..ed9d1fd 100644
--- a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
+++ b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
@@ -34,8 +34,10 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
 import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoader;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 
 import java.util.List;
 
@@ -48,7 +50,8 @@
   @Override
   public void configure(Binder binder)
   {
-    binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class);
+    binder.bind(SegmentCacheManager.class).to(SegmentLocalCacheManager.class).in(LazySingleton.class);
+    binder.bind(SegmentLoader.class).to(SegmentLocalCacheLoader.class).in(LazySingleton.class);
 
     bindDeepStorageLocal(binder);
 
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
new file mode 100644
index 0000000..9458574
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+
+/**
+ * A class to fetch segment files to local disk and manage the local cache.
+ * Implementations must be thread-safe.
+ */
+public interface SegmentCacheManager
+{
+  /**
+   * Checks whether a segment is already cached.
+   */
+  boolean isSegmentCached(DataSegment segment);
+
+  /**
+   * This method fetches the files for the given segment if the segment is not downloaded already.
+   * @throws SegmentLoadingException if there is an error in downloading files
+   */
+  File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+  /**
+   * Cleanup the cache space used by the segment
+   */
+  void cleanup(DataSegment segment);
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 741cfa1..8fe38a3 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -23,16 +23,23 @@
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
-import java.io.File;
-
 /**
- * Loading segments from deep storage to local storage.
- * Implementations must be thread-safe.
+ * Loading segments from deep storage to local storage. Internally, this class can delegate the download to
+ * {@link SegmentCacheManager}. Implementations must be thread-safe.
  */
 public interface SegmentLoader
 {
-  boolean isSegmentLoaded(DataSegment segment);
+  /**
+   * Builds a {@link Segment} by downloading if necessary
+   * @param segment - Segment to load
+   * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading
+   * @param loadFailed - Callback to invoke if lazy loading fails during column access.
+   * @throws SegmentLoadingException - If there is an error in loading the segment
+   */
   Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
-  File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+  /**
+   * cleanup any state used by this segment
+   */
   void cleanup(DataSegment segment);
 }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
new file mode 100644
index 0000000..6970f7b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.inject.Inject;
+
+import java.io.File;
+import java.io.IOException;
+
+public class SegmentLocalCacheLoader implements SegmentLoader
+{
+  private final SegmentCacheManager cacheManager;
+  private final IndexIO indexIO;
+  private final ObjectMapper jsonMapper;
+
+  @Inject
+  public SegmentLocalCacheLoader(SegmentCacheManager cacheManager, IndexIO indexIO, @Json ObjectMapper mapper)
+  {
+    this.cacheManager = cacheManager;
+    this.indexIO = indexIO;
+    this.jsonMapper = mapper;
+  }
+
+  @Override
+  public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
+  {
+    final File segmentFiles = cacheManager.getSegmentFiles(segment);
+    File factoryJson = new File(segmentFiles, "factory.json");
+    final SegmentizerFactory factory;
+
+    if (factoryJson.exists()) {
+      try {
+        factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
+      }
+      catch (IOException e) {
+        throw new SegmentLoadingException(e, "%s", e.getMessage());
+      }
+    } else {
+      factory = new MMappedQueryableSegmentizerFactory(indexIO);
+    }
+
+    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
+  }
+
+  @Override
+  public void cleanup(DataSegment segment)
+  {
+    cacheManager.cleanup(segment);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
similarity index 88%
rename from server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
rename to server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index eb32ea4..412cfe9 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -26,9 +26,6 @@
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nonnull;
@@ -41,14 +38,13 @@
 
 /**
  */
-public class SegmentLoaderLocalCacheManager implements SegmentLoader
+public class SegmentLocalCacheManager implements SegmentCacheManager
 {
   @VisibleForTesting
   static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
 
-  private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
+  private static final EmittingLogger log = new EmittingLogger(SegmentLocalCacheManager.class);
 
-  private final IndexIO indexIO;
   private final SegmentLoaderConfig config;
   private final ObjectMapper jsonMapper;
 
@@ -82,18 +78,16 @@
   private final StorageLocationSelectorStrategy strategy;
 
   // Note that we only create this via injection in historical and realtime nodes. Peons create these
-  // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
+  // objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific
   // directories rather than statically configured directories.
   @Inject
-  public SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  public SegmentLocalCacheManager(
       List<StorageLocation> locations,
       SegmentLoaderConfig config,
       @Nonnull StorageLocationSelectorStrategy strategy,
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.config = config;
     this.jsonMapper = mapper;
     this.locations = locations;
@@ -102,14 +96,13 @@
   }
 
   @VisibleForTesting
-  SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  SegmentLocalCacheManager(
       SegmentLoaderConfig config,
       @Nonnull StorageLocationSelectorStrategy strategy,
       @Json ObjectMapper mapper
   )
   {
-    this(indexIO, config.toStorageLocations(), config, strategy, mapper);
+    this(config.toStorageLocations(), config, strategy, mapper);
   }
 
   /**
@@ -117,13 +110,11 @@
    *
    * This ctor is mainly for test cases, including test cases in other modules
    */
-  public SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  public SegmentLocalCacheManager(
       SegmentLoaderConfig config,
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.config = config;
     this.jsonMapper = mapper;
     this.locations = config.toStorageLocations();
@@ -132,7 +123,7 @@
   }
 
   @Override
-  public boolean isSegmentLoaded(final DataSegment segment)
+  public boolean isSegmentCached(final DataSegment segment)
   {
     return findStorageLocationIfLoaded(segment) != null;
   }
@@ -177,36 +168,6 @@
     return downloadStartMarker.exists();
   }
 
-  @Override
-  public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
-  {
-    final ReferenceCountingLock lock = createOrGetLock(segment);
-    final File segmentFiles;
-    synchronized (lock) {
-      try {
-        segmentFiles = getSegmentFiles(segment);
-      }
-      finally {
-        unlock(segment, lock);
-      }
-    }
-    File factoryJson = new File(segmentFiles, "factory.json");
-    final SegmentizerFactory factory;
-
-    if (factoryJson.exists()) {
-      try {
-        factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
-      }
-      catch (IOException e) {
-        throw new SegmentLoadingException(e, "%s", e.getMessage());
-      }
-    } else {
-      factory = new MMappedQueryableSegmentizerFactory(indexIO);
-    }
-
-    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
-  }
-
   /**
    * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged.
    * @param segment
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 93b16a3..486ad46 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -154,11 +154,6 @@
     return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
   }
 
-  public boolean isSegmentCached(final DataSegment segment)
-  {
-    return segmentLoader.isSegmentLoaded(segment);
-  }
-
   /**
    * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based
    * datasource of a single table.
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index bc0da93..a9d972f 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -40,6 +40,7 @@
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
@@ -88,6 +89,7 @@
   private final ScheduledExecutorService exec;
   private final ServerTypeConfig serverTypeConfig;
   private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
+  private final SegmentCacheManager segmentCacheManager;
 
   private volatile boolean started = false;
 
@@ -108,6 +110,7 @@
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
+      SegmentCacheManager segmentCacheManager,
       ServerTypeConfig serverTypeConfig
   )
   {
@@ -117,6 +120,7 @@
         announcer,
         serverAnnouncer,
         segmentManager,
+        segmentCacheManager,
         Executors.newScheduledThreadPool(
             config.getNumLoadingThreads(),
             Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
@@ -132,6 +136,7 @@
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
+      SegmentCacheManager segmentCacheManager,
       ScheduledExecutorService exec,
       ServerTypeConfig serverTypeConfig
   )
@@ -141,6 +146,7 @@
     this.announcer = announcer;
     this.serverAnnouncer = serverAnnouncer;
     this.segmentManager = segmentManager;
+    this.segmentCacheManager = segmentCacheManager;
     this.exec = exec;
     this.serverTypeConfig = serverTypeConfig;
 
@@ -228,7 +234,7 @@
         if (!segment.getId().toString().equals(file.getName())) {
           log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getId());
           ignored++;
-        } else if (segmentManager.isSegmentCached(segment)) {
+        } else if (segmentCacheManager.isSegmentCached(segment)) {
           cachedSegments.add(segment);
         } else {
           log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getId());
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
new file mode 100644
index 0000000..a268681
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+public class CacheTestSegmentCacheManager implements SegmentCacheManager
+{
+  private final Set<DataSegment> segmentsInTrash = new HashSet<>();
+
+  @Override
+  public boolean isSegmentCached(DataSegment segment)
+  {
+    Map<String, Object> loadSpec = segment.getLoadSpec();
+    return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
+  }
+
+  @Override
+  public File getSegmentFiles(DataSegment segment)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cleanup(DataSegment segment)
+  {
+    segmentsInTrash.add(segment);
+  }
+
+  public Set<DataSegment> getSegmentsInTrash()
+  {
+    return segmentsInTrash;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 557537c..cf47755 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment.loading;
 
-import org.apache.druid.java.util.common.MapUtils;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
@@ -28,23 +27,10 @@
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
-import java.io.File;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
 */
 public class CacheTestSegmentLoader implements SegmentLoader
 {
-  private final Set<DataSegment> segmentsInTrash = new HashSet<>();
-
-  @Override
-  public boolean isSegmentLoaded(DataSegment segment)
-  {
-    Map<String, Object> loadSpec = segment.getLoadSpec();
-    return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
-  }
 
   @Override
   public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
@@ -83,19 +69,8 @@
   }
 
   @Override
-  public File getSegmentFiles(DataSegment segment)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public void cleanup(DataSegment segment)
   {
-    segmentsInTrash.add(segment);
-  }
 
-  public Set<DataSegment> getSegmentsInTrash()
-  {
-    return segmentsInTrash;
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
similarity index 95%
rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index f617261..e812ff7 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -30,7 +30,6 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -53,7 +52,7 @@
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-public class SegmentLoaderLocalCacheManagerConcurrencyTest
+public class SegmentLocalCacheManagerConcurrencyTest
 {
   @Rule
   public final TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -66,10 +65,10 @@
   private final String segmentVersion;
 
   private File localSegmentCacheFolder;
-  private SegmentLoaderLocalCacheManager manager;
+  private SegmentLocalCacheManager manager;
   private ExecutorService executorService;
 
-  public SegmentLoaderLocalCacheManagerConcurrencyTest()
+  public SegmentLocalCacheManagerConcurrencyTest()
   {
     jsonMapper = new DefaultObjectMapper();
     jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
@@ -93,8 +92,7 @@
     final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
     locations.add(locationConfig);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
similarity index 90%
rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 75bbeff..26c9cbd 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -27,7 +27,6 @@
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -41,7 +40,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-public class SegmentLoaderLocalCacheManagerTest
+public class SegmentLocalCacheManagerTest
 {
   @Rule
   public final TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -49,9 +48,9 @@
   private final ObjectMapper jsonMapper;
 
   private File localSegmentCacheFolder;
-  private SegmentLoaderLocalCacheManager manager;
+  private SegmentLocalCacheManager manager;
 
-  public SegmentLoaderLocalCacheManagerTest()
+  public SegmentLocalCacheManagerTest()
   {
     jsonMapper = new DefaultObjectMapper();
     jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
@@ -73,8 +72,7 @@
     final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null);
     locations.add(locationConfig);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -90,10 +88,10 @@
     );
     cachedSegmentFile.mkdirs();
 
-    Assert.assertTrue("Expect cache hit", manager.isSegmentLoaded(cachedSegment));
+    Assert.assertTrue("Expect cache hit", manager.isSegmentCached(cachedSegment));
 
     final DataSegment uncachedSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D");
-    Assert.assertFalse("Expect cache miss", manager.isSegmentLoaded(uncachedSegment));
+    Assert.assertFalse("Expect cache miss", manager.isSegmentCached(uncachedSegment));
   }
 
   @Test
@@ -122,13 +120,13 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.getSegmentFiles(segmentToDownload);
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -143,8 +141,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -169,14 +166,14 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -192,8 +189,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -218,14 +214,14 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -243,8 +239,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -276,7 +271,7 @@
     }
     catch (SegmentLoadingException e) {
     }
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
     manager.cleanup(segmentToDownload);
   }
 
@@ -293,8 +288,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -319,11 +313,11 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
         ImmutableMap.of(
@@ -347,10 +341,10 @@
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     manager.cleanup(segmentToDownload2);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2));
   }
 
   private DataSegment dataSegmentWithInterval(String intervalStr)
@@ -402,8 +396,7 @@
       );
     }
 
-    manager = new SegmentLoaderLocalCacheManager(
-      TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
       new SegmentLoaderConfig().withLocations(locationConfigs),
       new RoundRobinStorageLocationSelectorStrategy(locations),
       jsonMapper
@@ -425,14 +418,14 @@
     // manually create a local segment under segmentSrcFolder
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload1));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload1);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload1));
 
     manager.cleanup(segmentToDownload1);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload1));
 
     // Segment 2 should be downloaded in local_storage_folder2
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
@@ -449,14 +442,14 @@
     // manually create a local segment under segmentSrcFolder
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     manager.cleanup(segmentToDownload2);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2));
 
     // Segment 3 should be downloaded in local_storage_folder3
     final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec(
@@ -476,10 +469,10 @@
 
     File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
     Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3));
 
     manager.cleanup(segmentToDownload3);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3));
 
     // Segment 4 should be downloaded in local_storage_folder again, asserting round robin distribution of segments
     final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec(
@@ -497,13 +490,13 @@
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
         ".000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4));
 
     File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
     Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4));
     manager.cleanup(segmentToDownload4);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload4));
   }
 
   private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception
@@ -538,8 +531,7 @@
     locations.add(locationConfig2);
     locations.add(locationConfig3);
 
-    manager = new SegmentLoaderLocalCacheManager(
-      TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
       new SegmentLoaderConfig().withLocations(locations),
       jsonMapper
     );
@@ -561,11 +553,11 @@
     createLocalSegmentFile(segmentSrcFolder,
         "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Segment 2 should be downloaded in local_storage_folder2, segment2 size 5L
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 5L).withLoadSpec(
@@ -583,11 +575,11 @@
     createLocalSegmentFile(segmentSrcFolder,
         "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
 
     // Segment 3 should be downloaded in local_storage_folder3, segment3 size 20L
@@ -608,7 +600,7 @@
 
     File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
     Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3));
 
     // Now the storage locations local_storage_folder1, local_storage_folder2 and local_storage_folder3 have 10, 5 and
     // 20 bytes occupied respectively. The default strategy should pick location2 (as it has least bytes used) for the
@@ -628,11 +620,11 @@
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
         ".000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4));
 
     File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
     Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4));
 
   }
 
@@ -652,8 +644,7 @@
 
     SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig().withLocations(locationConfigs);
 
-    manager = new SegmentLoaderLocalCacheManager(
-            TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
             new SegmentLoaderConfig().withLocations(locationConfigs),
             new RandomStorageLocationSelectorStrategy(segmentLoaderConfig.toStorageLocations()),
             jsonMapper
@@ -677,11 +668,11 @@
     createLocalSegmentFile(segmentSrcFolder,
             "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Segment 2 should be downloaded in local_storage_folder3, segment2 size 9L
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 9L).withLoadSpec(
@@ -699,11 +690,11 @@
     createLocalSegmentFile(segmentSrcFolder,
             "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
 
     // Segment 3 should not be downloaded, segment3 size 20L
@@ -729,7 +720,7 @@
     }
     catch (SegmentLoadingException e) {
     }
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3));
   }
 
   @Test
@@ -759,16 +750,16 @@
     Assert.assertTrue(indexZip.createNewFile());
 
     final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload);
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Emulate a corrupted segment file
     final File downloadMarker = new File(
         cachedSegmentDir,
-        SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
+        SegmentLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
     );
     Assert.assertTrue(downloadMarker.createNewFile());
 
-    Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload));
     Assert.assertFalse(cachedSegmentDir.exists());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
index 38c4100..b62d453 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
@@ -52,8 +52,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -102,7 +103,7 @@
   private IndexIO indexIO;
   private File segmentCacheDir;
   private File segmentDeepStorageDir;
-  private SegmentLoaderLocalCacheManager segmentLoader;
+  private SegmentLocalCacheManager segmentCacheManager;
   private SegmentManager segmentManager;
   private BroadcastTableJoinableFactory joinableFactory;
 
@@ -125,8 +126,7 @@
     );
     segmentCacheDir = temporaryFolder.newFolder();
     segmentDeepStorageDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -139,7 +139,7 @@
         },
         objectMapper
     );
-    segmentManager = new SegmentManager(segmentLoader);
+    segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper));
     joinableFactory = new BroadcastTableJoinableFactory(segmentManager);
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 762339f..8698146 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -46,7 +46,6 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,15 +60,10 @@
 
 public class SegmentManagerTest
 {
+
   private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader()
   {
     @Override
-    public boolean isSegmentLoaded(DataSegment segment)
-    {
-      return false;
-    }
-
-    @Override
     public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
     {
       return new SegmentForTesting(
@@ -79,12 +73,6 @@
     }
 
     @Override
-    public File getSegmentFiles(DataSegment segment)
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void cleanup(DataSegment segment)
     {
 
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index 87587ce..5295b9d 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -40,8 +40,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -57,6 +58,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -82,7 +84,7 @@
   private IndexIO indexIO;
   private File segmentCacheDir;
   private File segmentDeepStorageDir;
-  private SegmentLoaderLocalCacheManager segmentLoader;
+  private SegmentLocalCacheManager segmentCacheManager;
   private SegmentManager segmentManager;
   private ExecutorService exec;
 
@@ -98,8 +100,7 @@
     indexIO = new IndexIO(objectMapper, () -> 0);
     segmentCacheDir = temporaryFolder.newFolder();
     segmentDeepStorageDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -112,7 +113,7 @@
         },
         objectMapper
     );
-    segmentManager = new SegmentManager(segmentLoader);
+    segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper));
     exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d");
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
@@ -137,7 +138,7 @@
     }
     Assert.assertEquals(1, segmentPuller.numFileLoaded.size());
     Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
-    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+    Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size());
   }
 
   @Test(timeout = 6000L)
@@ -168,7 +169,7 @@
     }
     Assert.assertEquals(11, segmentPuller.numFileLoaded.size());
     Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
-    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+    Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size());
   }
 
   private DataSegment createSegment(String interval) throws IOException
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
index 37cc2606..00164fc 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
@@ -35,9 +35,11 @@
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -87,9 +89,10 @@
     objectMapper = TestHelper.makeJsonMapper();
     objectMapper.registerSubtypes(TestLoadSpec.class);
     objectMapper.registerSubtypes(TestSegmentizerFactory.class);
-    SegmentManager segmentManager = new SegmentManager(new SegmentLoaderLocalCacheManager(
+    SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, objectMapper);
+    SegmentManager segmentManager = new SegmentManager(new SegmentLocalCacheLoader(
+        cacheManager,
         TestIndex.INDEX_IO,
-        config,
         objectMapper
     ));
     segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class);
@@ -99,6 +102,7 @@
         segmentAnnouncer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        cacheManager,
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 3930706..02a6db1 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -31,7 +31,9 @@
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.loading.CacheTestSegmentCacheManager;
 import org.apache.druid.segment.loading.CacheTestSegmentLoader;
+import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
@@ -80,7 +82,8 @@
   private TestStorageLocation testStorageLocation;
   private AtomicInteger announceCount;
   private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
-  private CacheTestSegmentLoader segmentLoader;
+  private CacheTestSegmentCacheManager segmentCacheManager;
+  private SegmentLoader segmentLoader;
   private SegmentManager segmentManager;
   private List<Runnable> scheduledRunnable;
   private SegmentLoaderConfig segmentLoaderConfig;
@@ -116,6 +119,7 @@
 
     scheduledRunnable = new ArrayList<>();
 
+    segmentCacheManager = new CacheTestSegmentCacheManager();
     segmentLoader = new CacheTestSegmentLoader();
     segmentManager = new SegmentManager(segmentLoader);
     segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
@@ -239,6 +243,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
@@ -273,7 +278,7 @@
     }
 
     Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
-    Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
+    Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
 
     segmentLoadDropHandler.stop();
   }
@@ -312,7 +317,7 @@
     }
 
     Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
-    Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
+    Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
 
     segmentLoadDropHandler.stop();
   }
@@ -409,6 +414,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
 
@@ -495,6 +501,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 902acd8..16832b3 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -104,7 +104,6 @@
 
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -149,12 +148,6 @@
         new SegmentLoader()
         {
           @Override
-          public boolean isSegmentLoaded(DataSegment segment)
-          {
-            return false;
-          }
-
-          @Override
           public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
           {
             return new SegmentForTesting(
@@ -164,12 +157,6 @@
           }
 
           @Override
-          public File getSegmentFiles(DataSegment segment)
-          {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
           public void cleanup(DataSegment segment)
           {
 
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index c30a37c..8356a26 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -28,6 +28,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
@@ -169,6 +170,7 @@
         EasyMock.createNiceMock(DataSegmentAnnouncer.class),
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         EasyMock.createNiceMock(SegmentManager.class),
+        EasyMock.createNiceMock(SegmentCacheManager.class),
         EasyMock.createNiceMock(ScheduledExecutorService.class),
         new ServerTypeConfig(ServerType.HISTORICAL)
     )
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5a78f0f..569e831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -131,6 +131,7 @@
 import org.joda.time.chrono.ISOChronology;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
