Split SegmentLoader into SegmentLoader and SegmentCacheManager (#11466)

This PR splits current SegmentLoader into SegmentLoader and SegmentCacheManager.

SegmentLoader - this class is responsible for building the segment object but does not expose any methods for downloading, cache space management, etc. Default implementation delegates the download operations to SegmentCacheManager and only contains the logic for building segments once downloaded. . This class will be used in SegmentManager to construct Segment objects.

SegmentCacheManager - this class manages the segment cache on the local disk. It fetches the segment files to the local disk, can clean up the cache, and in the future, support reserve and release on cache space. [See https://github.com/Make SegmentLoader extensible and customizable #11398]. This class will be used in ingestion tasks such as compaction, re-indexing where segment files need to be downloaded locally.
diff --git a/docs/development/modules.md b/docs/development/modules.md
index d1080b6..0110288 100644
--- a/docs/development/modules.md
+++ b/docs/development/modules.md
@@ -137,7 +137,7 @@
 ip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
 2015-04-14T02:49:08,276 INFO [ZkCoordinator-0] org.apache.druid.storage.azure.AzureDataSegmentPuller - Loaded 1196 bytes from [dde/2015-01-02T00:00:00.000Z_2015-01-03
 T00:00:00.000Z/2015-04-14T02:41:09.484Z/0/index.zip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
-2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
+2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
 2015-04-14T02:49:08,282 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] at path[/druid/dev/segments/192.168.33.104:8081/192.168.33.104:8081_historical__default_tier_2015-04-14T02:49:08.282Z_7bb87230ebf940188511dd4a53ffd7351]
 2015-04-14T02:49:08,292 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.ZkCoordinator - Completed request [LOAD: dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z]
 ```
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index ca3630d..5424288 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -60,7 +60,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -2887,7 +2887,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 5b3a77c..3d0f86d 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -48,7 +48,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -2974,7 +2974,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
similarity index 77%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
index 17b8dc1..6672cf0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java
@@ -22,10 +22,9 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 
 import java.io.File;
@@ -34,25 +33,21 @@
 /**
  *
  */
-public class SegmentLoaderFactory
+public class SegmentCacheManagerFactory
 {
-  private final IndexIO indexIO;
   private final ObjectMapper jsonMapper;
 
   @Inject
-  public SegmentLoaderFactory(
-      IndexIO indexIO,
+  public SegmentCacheManagerFactory(
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.jsonMapper = mapper;
   }
 
-  public SegmentLoader manufacturate(File storageDir)
+  public SegmentCacheManager manufacturate(File storageDir)
   {
-    return new SegmentLoaderLocalCacheManager(
-        indexIO,
+    return new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(
             Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
         jsonMapper
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index dbaec5a..979de5f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -57,7 +57,7 @@
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -101,7 +101,7 @@
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
   private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
-  private final SegmentLoader segmentLoader;
+  private final SegmentCacheManager segmentCacheManager;
   private final ObjectMapper jsonMapper;
   private final File taskWorkDir;
   private final IndexIO indexIO;
@@ -144,7 +144,7 @@
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       @Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
-      SegmentLoader segmentLoader,
+      SegmentCacheManager segmentCacheManager,
       ObjectMapper jsonMapper,
       File taskWorkDir,
       IndexIO indexIO,
@@ -183,7 +183,7 @@
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
-    this.segmentLoader = segmentLoader;
+    this.segmentCacheManager = segmentCacheManager;
     this.jsonMapper = jsonMapper;
     this.taskWorkDir = taskWorkDir;
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@@ -318,7 +318,7 @@
   {
     Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
     for (DataSegment segment : segments) {
-      retVal.put(segment, segmentLoader.getSegmentFiles(segment));
+      retVal.put(segment, segmentCacheManager.getSegmentFiles(segment));
     }
 
     return retVal;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 5cd4eb5..cb1d3c1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -83,7 +83,7 @@
   private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final ObjectMapper jsonMapper;
   private final IndexIO indexIO;
   private final Cache cache;
@@ -124,7 +124,7 @@
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Provider<MonitorScheduler> monitorSchedulerProvider,
-      SegmentLoaderFactory segmentLoaderFactory,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
       @Json ObjectMapper jsonMapper,
       IndexIO indexIO,
       Cache cache,
@@ -162,7 +162,7 @@
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
-    this.segmentLoaderFactory = segmentLoaderFactory;
+    this.segmentCacheManagerFactory = segmentCacheManagerFactory;
     this.jsonMapper = jsonMapper;
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
@@ -204,7 +204,7 @@
         queryProcessingPool,
         joinableFactory,
         monitorSchedulerProvider,
-        segmentLoaderFactory.manufacturate(taskWorkDir),
+        segmentCacheManagerFactory.manufacturate(taskWorkDir),
         jsonMapper,
         taskWorkDir,
         indexIO,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 879949d..3374e8c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -52,7 +52,7 @@
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -157,7 +157,7 @@
   private final PartitionConfigurationManager partitionConfigurationManager;
 
   @JsonIgnore
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
 
   @JsonIgnore
   private final RetryPolicyFactory retryPolicyFactory;
@@ -185,7 +185,7 @@
       @JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
       @JsonProperty("context") @Nullable final Map<String, Object> context,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
@@ -233,7 +233,7 @@
     this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
     this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
     this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
-    this.segmentLoaderFactory = segmentLoaderFactory;
+    this.segmentCacheManagerFactory = segmentCacheManagerFactory;
     this.retryPolicyFactory = retryPolicyFactory;
   }
 
@@ -422,7 +422,7 @@
         metricsSpec,
         granularitySpec,
         toolbox.getCoordinatorClient(),
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory,
         ioConfig.isDropExisting()
     );
@@ -521,7 +521,7 @@
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       final CoordinatorClient coordinatorClient,
-      final SegmentLoaderFactory segmentLoaderFactory,
+      final SegmentCacheManagerFactory segmentCacheManagerFactory,
       final RetryPolicyFactory retryPolicyFactory,
       final boolean dropExisting
   ) throws IOException, SegmentLoadingException
@@ -604,7 +604,7 @@
                     dataSchema,
                     interval,
                     coordinatorClient,
-                    segmentLoaderFactory,
+                    segmentCacheManagerFactory,
                     retryPolicyFactory,
                     dropExisting
                 ),
@@ -632,7 +632,7 @@
                   dataSchema,
                   segmentProvider.interval,
                   coordinatorClient,
-                  segmentLoaderFactory,
+                  segmentCacheManagerFactory,
                   retryPolicyFactory,
                   dropExisting
               ),
@@ -647,7 +647,7 @@
       DataSchema dataSchema,
       Interval interval,
       CoordinatorClient coordinatorClient,
-      SegmentLoaderFactory segmentLoaderFactory,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
       RetryPolicyFactory retryPolicyFactory,
       boolean dropExisting
   )
@@ -663,7 +663,7 @@
             null,
             toolbox.getIndexIO(),
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             toolbox.getConfig()
         ),
@@ -1016,7 +1016,7 @@
   public static class Builder
   {
     private final String dataSource;
-    private final SegmentLoaderFactory segmentLoaderFactory;
+    private final SegmentCacheManagerFactory segmentCacheManagerFactory;
     private final RetryPolicyFactory retryPolicyFactory;
 
     private CompactionIOConfig ioConfig;
@@ -1035,12 +1035,12 @@
 
     public Builder(
         String dataSource,
-        SegmentLoaderFactory segmentLoaderFactory,
+        SegmentCacheManagerFactory segmentCacheManagerFactory,
         RetryPolicyFactory retryPolicyFactory
     )
     {
       this.dataSource = dataSource;
-      this.segmentLoaderFactory = segmentLoaderFactory;
+      this.segmentCacheManagerFactory = segmentCacheManagerFactory;
       this.retryPolicyFactory = retryPolicyFactory;
     }
 
@@ -1118,7 +1118,7 @@
           granularitySpec,
           tuningConfig,
           context,
-          segmentLoaderFactory,
+          segmentCacheManagerFactory,
           retryPolicyFactory
       );
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 01b3d95..f20a0dd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -36,7 +36,7 @@
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.indexing.common.ReingestionTimelineUtils;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IAE;
@@ -45,7 +45,7 @@
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -84,7 +84,7 @@
   private final Long maxInputSegmentBytesPerTask;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final RetryPolicyFactory retryPolicyFactory;
 
   private List<InputSplit<List<WindowedSegmentId>>> splits;
@@ -102,7 +102,7 @@
       @JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask,
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
@@ -119,7 +119,7 @@
     this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
-    this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
+    this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
   }
 
@@ -136,7 +136,7 @@
         maxInputSegmentBytesPerTask,
         indexIO,
         coordinatorClient,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory
     );
   }
@@ -202,7 +202,7 @@
     // Note: this requires enough local storage space to fit all of the segments, even though
     // IngestSegmentFirehose iterates over the segments in series. We may want to change this
     // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
     Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
     for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
       for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
@@ -210,7 +210,7 @@
 
         segmentFileMap.computeIfAbsent(segment, k -> {
           try {
-            return segmentLoader.getSegmentFiles(segment);
+            return segmentCacheManager.getSegmentFiles(segment);
           }
           catch (SegmentLoadingException e) {
             throw new RuntimeException(e);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index c9d0f4e..9be3378 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -42,7 +42,7 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.RetryPolicy;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.IAE;
@@ -52,7 +52,7 @@
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnHolder;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -129,7 +129,7 @@
   private final DimFilter dimFilter;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final RetryPolicyFactory retryPolicyFactory;
   private final TaskConfig taskConfig;
 
@@ -155,7 +155,7 @@
       @Nullable @JsonProperty("metrics") List<String> metrics,
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
-      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
       @JacksonInject RetryPolicyFactory retryPolicyFactory,
       @JacksonInject TaskConfig taskConfig
   )
@@ -172,7 +172,7 @@
     this.metrics = metrics;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
-    this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
+    this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
     this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
   }
@@ -224,7 +224,7 @@
   @Override
   protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
   {
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
 
     final List<TimelineObjectHolder<String, DataSegment>> timeline = createTimeline();
     final Iterator<DruidSegmentInputEntity> entityIterator = FluentIterable
@@ -235,7 +235,7 @@
           //noinspection ConstantConditions
           return FluentIterable
               .from(partitionHolder)
-              .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
+              .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
         }).iterator();
 
     final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
@@ -339,7 +339,7 @@
         metrics,
         indexIO,
         coordinatorClient,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         retryPolicyFactory,
         taskConfig
     );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
index 7550863..0548208 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
@@ -23,7 +23,7 @@
 import com.google.common.base.Predicates;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -37,13 +37,13 @@
 {
   private static final EmittingLogger log = new EmittingLogger(DruidSegmentInputEntity.class);
 
-  private final SegmentLoader segmentLoader;
+  private final SegmentCacheManager segmentCacheManager;
   private final DataSegment segment;
   private final Interval intervalFilter;
 
-  DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter)
+  DruidSegmentInputEntity(SegmentCacheManager segmentCacheManager, DataSegment segment, Interval intervalFilter)
   {
-    this.segmentLoader = segmentLoader;
+    this.segmentCacheManager = segmentCacheManager;
     this.segment = segment;
     this.intervalFilter = intervalFilter;
   }
@@ -71,7 +71,7 @@
   {
     final File segmentFile;
     try {
-      segmentFile = segmentLoader.getSegmentFiles(segment);
+      segmentFile = segmentCacheManager.getSegmentFiles(segment);
     }
     catch (SegmentLoadingException e) {
       throw new RuntimeException(e);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index ac0685a..3f75ce8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -44,8 +44,8 @@
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@@ -83,8 +83,8 @@
   private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
   private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class);
   private ObjectMapper ObjectMapper = new ObjectMapper();
-  private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
-  private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
+  private SegmentCacheManagerFactory mockSegmentCacheManagerFactory = EasyMock.createMock(SegmentCacheManagerFactory.class);
+  private SegmentLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLocalCacheManager.class);
   private Task task = EasyMock.createMock(Task.class);
   private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
   private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
@@ -129,7 +129,7 @@
         mockQueryProcessingPool,
         NoopJoinableFactory.INSTANCE,
         () -> mockMonitorScheduler,
-        mockSegmentLoaderFactory,
+        mockSegmentCacheManagerFactory,
         ObjectMapper,
         mockIndexIO,
         mockCache,
@@ -194,12 +194,12 @@
   {
     File expectedFile = temporaryFolder.newFile();
     EasyMock
-        .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject()))
+        .expect(mockSegmentCacheManagerFactory.manufacturate(EasyMock.anyObject()))
         .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
     EasyMock
         .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject()))
         .andReturn(expectedFile).anyTimes();
-    EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager);
+    EasyMock.replay(mockSegmentCacheManagerFactory, mockSegmentLoaderLocalCacheManager);
     DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build();
     List<DataSegment> segments = ImmutableList.of
         (
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index d7c5e32..0c6b1c4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -51,7 +51,7 @@
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -1590,7 +1590,7 @@
         DirectQueryProcessingPool.INSTANCE, // queryExecutorService
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 04b1b00..fd7b699 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -40,7 +40,7 @@
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -215,7 +215,7 @@
     final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
     final CompactionTask.Builder builder = new CompactionTask.Builder(
         "datasource",
-        new SegmentLoaderFactory(null, mapper),
+        new SegmentCacheManagerFactory(mapper),
         new RetryPolicyFactory(new RetryPolicyConfig())
     );
     final CompactionTask task = builder
@@ -338,7 +338,7 @@
                   binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                   binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
                   binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
-                  binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper));
                   binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
                   binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient());
                 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index e8b6b12..1732b3b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -139,7 +139,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -182,7 +182,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -222,7 +222,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -262,7 +262,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -300,7 +300,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -328,7 +328,7 @@
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -373,7 +373,7 @@
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -456,7 +456,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
@@ -490,7 +490,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index a958ee6..b6098ea 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -41,7 +41,7 @@
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -75,9 +75,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -160,7 +160,7 @@
   private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
   private final IndexingServiceClient indexingServiceClient;
   private final CoordinatorClient coordinatorClient;
-  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory;
   private final LockGranularity lockGranularity;
   private final TestUtils testUtils;
 
@@ -182,7 +182,7 @@
         return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
       }
     };
-    segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
     this.lockGranularity = lockGranularity;
   }
 
@@ -230,7 +230,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -279,7 +279,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -368,7 +368,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -446,7 +446,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -543,7 +543,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -598,7 +598,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -653,7 +653,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -698,7 +698,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -731,7 +731,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -775,7 +775,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -837,7 +837,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -925,7 +925,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -994,7 +994,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1046,7 +1046,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1154,7 +1154,7 @@
                     null,
                     getIndexIO(),
                     coordinatorClient,
-                    segmentLoaderFactory,
+                    segmentCacheManagerFactory,
                     RETRY_POLICY_FACTORY
                 ),
                 false,
@@ -1285,8 +1285,7 @@
 
   private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException
   {
-    final SegmentLoader loader = new SegmentLoaderLocalCacheManager(
-        getIndexIO(),
+    final SegmentCacheManager loader = new SegmentLocalCacheManager(
         new SegmentLoaderConfig() {
           @Override
           public List<StorageLocationConfig> getLocations()
@@ -1342,11 +1341,11 @@
   {
 
     final File cacheDir = temporaryFolder.newFolder();
-    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(cacheDir);
+    final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
 
     List<Cursor> cursors = new ArrayList<>();
     for (DataSegment segment : segments) {
-      final File segmentFile = segmentLoader.getSegmentFiles(segment);
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
       final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
           new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 330ccbf..ac9e3d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -57,7 +57,7 @@
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -188,7 +188,7 @@
   private static List<DataSegment> SEGMENTS;
 
   private TaskToolbox toolbox;
-  private SegmentLoaderFactory segmentLoaderFactory;
+  private SegmentCacheManagerFactory segmentCacheManagerFactory;
 
   @BeforeClass
   public static void setupClass()
@@ -277,7 +277,7 @@
                   binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                   binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory());
                   binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
-                  binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper));
                   binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
                   binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT);
                 }
@@ -361,7 +361,7 @@
         testIndexIO,
         SEGMENT_MAP
     );
-    segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
   }
 
   @Test
@@ -369,7 +369,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -379,7 +379,7 @@
 
     final Builder builder2 = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -397,7 +397,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -426,7 +426,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -455,7 +455,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
@@ -471,7 +471,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     final CompactionTask task = builder
@@ -492,7 +492,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
     final CompactionTask task = builder
@@ -511,7 +511,7 @@
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -586,14 +586,14 @@
         toolbox.getChatHandlerProvider(),
         toolbox.getRowIngestionMetersFactory(),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         toolbox.getAppenderatorsManager()
     );
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -790,7 +790,7 @@
             null,
             toolbox.getRowIngestionMetersFactory(),
             COORDINATOR_CLIENT,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             RETRY_POLICY_FACTORY,
             toolbox.getAppenderatorsManager()
         );
@@ -848,7 +848,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -921,7 +921,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -995,7 +995,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1069,7 +1069,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1133,7 +1133,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1177,7 +1177,7 @@
         customMetricsSpec,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1214,7 +1214,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1257,7 +1257,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1281,7 +1281,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1295,7 +1295,7 @@
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY
     );
 
@@ -1316,7 +1316,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1354,7 +1354,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1393,7 +1393,7 @@
             new PeriodGranularity(Period.months(3), null, null)
         ),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1431,7 +1431,7 @@
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -1468,7 +1468,7 @@
         null,
         new ClientCompactionTaskGranularitySpec(null, null),
         COORDINATOR_CLIENT,
-        segmentLoaderFactory,
+        segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
         IOConfig.DEFAULT_DROP_EXISTING
     );
@@ -2014,7 +2014,7 @@
         @JacksonInject ChatHandlerProvider chatHandlerProvider,
         @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
         @JacksonInject CoordinatorClient coordinatorClient,
-        @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+        @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
         @JacksonInject RetryPolicyFactory retryPolicyFactory,
         @JacksonInject AppenderatorsManager appenderatorsManager
     )
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 55c1f15..f805b2c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -80,9 +80,9 @@
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
@@ -172,7 +172,7 @@
   private final boolean useInputFormatApi;
 
   private AppenderatorsManager appenderatorsManager;
-  private SegmentLoader segmentLoader;
+  private SegmentCacheManager segmentCacheManager;
   private TestTaskRunner taskRunner;
 
   public IndexTaskTest(LockGranularity lockGranularity, boolean useInputFormatApi)
@@ -190,8 +190,7 @@
     appenderatorsManager = new TestAppenderatorsManager();
 
     final File cacheDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -345,7 +344,7 @@
 
     Assert.assertEquals(1, segments.size());
     DataSegment segment = segments.get(0);
-    final File segmentFile = segmentLoader.getSegmentFiles(segment);
+    final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
     final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
         new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
@@ -595,7 +594,7 @@
       final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
       Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction());
 
-      final File segmentFile = segmentLoader.getSegmentFiles(segment);
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
       final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
           new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index c42bf20..ab9b0d7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -25,7 +25,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -63,7 +63,7 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -95,7 +95,7 @@
 
   private final TestUtils testUtils = new TestUtils();
   private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
-  private SegmentLoaderFactory segmentLoaderFactory;
+  private SegmentCacheManagerFactory segmentCacheManagerFactory;
   private TaskStorage taskStorage;
   private IndexerSQLMetadataStorageCoordinator storageCoordinator;
   private SegmentsMetadataManager segmentsMetadataManager;
@@ -123,7 +123,7 @@
         derbyConnectorRule.getConnector()
     );
     lockbox = new TaskLockbox(taskStorage, storageCoordinator);
-    segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
+    segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
   }
 
   @After
@@ -153,9 +153,9 @@
     lockbox.remove(task);
   }
 
-  public SegmentLoader newSegmentLoader(File storageDir)
+  public SegmentCacheManager newSegmentLoader(File storageDir)
   {
-    return segmentLoaderFactory.manufacturate(storageDir);
+    return segmentCacheManagerFactory.manufacturate(storageDir);
   }
 
   public ObjectMapper getObjectMapper()
@@ -168,9 +168,9 @@
     return taskStorage;
   }
 
-  public SegmentLoaderFactory getSegmentLoaderFactory()
+  public SegmentCacheManagerFactory getSegmentCacheManagerFactory()
   {
-    return segmentLoaderFactory;
+    return segmentCacheManagerFactory;
   }
 
   public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index f2b1503..94bf84d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -42,7 +42,7 @@
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestFirehose;
@@ -989,7 +989,7 @@
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a716217..f79406b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -30,7 +30,7 @@
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.input.DruidInputSource;
@@ -55,8 +55,10 @@
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -287,8 +289,9 @@
 
   private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
   {
-    final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
+    final SegmentCacheManager cacheManager = new SegmentCacheManagerFactory(getObjectMapper())
         .manufacturate(tempSegmentDir);
+    final SegmentLoader loader = new SegmentLocalCacheLoader(cacheManager, getIndexIO(), getObjectMapper());
     try {
       return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP);
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index a0d5449..d10013e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -49,7 +49,7 @@
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -614,7 +614,7 @@
             .addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER)
             .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
             .addValue(CoordinatorClient.class, coordinatorClient)
-            .addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper))
+            .addValue(SegmentCacheManagerFactory.class, new SegmentCacheManagerFactory(objectMapper))
             .addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
             .addValue(TaskConfig.class, taskConfig)
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index 7f001c2..c2021fcd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -227,7 +227,7 @@
   {
     return new Builder(
         DATASOURCE,
-        getSegmentLoaderFactory(),
+        getSegmentCacheManagerFactory(),
         RETRY_POLICY_FACTORY
     );
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 34ae0ab..fa13c8e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -44,7 +44,7 @@
 import org.apache.druid.indexing.common.ReingestionTimelineUtils;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -226,7 +226,7 @@
     SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
     EasyMock.replay(notifierFactory);
 
-    final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+    final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
     final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
     Collection<Object[]> values = new ArrayList<>();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 1213f9e..d226dc2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -37,7 +37,7 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
@@ -318,7 +318,7 @@
     for (final TestCase testCase : testCases) {
       SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
       EasyMock.replay(notifierFactory);
-      final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+      final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER);
       final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
       final CoordinatorClient cc = new CoordinatorClient(null, null)
       {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
index dcdc537..ebc2b94 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
@@ -27,7 +27,7 @@
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.guice.IndexingServiceInputSourceModule;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.Intervals;
@@ -45,7 +45,7 @@
 {
   private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
   private final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
-  private final SegmentLoaderFactory segmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
+  private final SegmentCacheManagerFactory segmentCacheManagerFactory = EasyMock.createMock(SegmentCacheManagerFactory.class);
   private final RetryPolicyFactory retryPolicyFactory = EasyMock.createMock(RetryPolicyFactory.class);
   private final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
 
@@ -63,7 +63,7 @@
     final InjectableValues.Std injectableValues = (InjectableValues.Std) mapper.getInjectableValues();
     injectableValues.addValue(IndexIO.class, indexIO);
     injectableValues.addValue(CoordinatorClient.class, coordinatorClient);
-    injectableValues.addValue(SegmentLoaderFactory.class, segmentLoaderFactory);
+    injectableValues.addValue(SegmentCacheManagerFactory.class, segmentCacheManagerFactory);
     injectableValues.addValue(RetryPolicyFactory.class, retryPolicyFactory);
     injectableValues.addValue(TaskConfig.class, taskConfig);
   }
@@ -90,7 +90,7 @@
             null,
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
@@ -124,7 +124,7 @@
             ImmutableList.of("b"),
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
@@ -164,7 +164,7 @@
             null,
             indexIO,
             coordinatorClient,
-            segmentLoaderFactory,
+            segmentCacheManagerFactory,
             retryPolicyFactory,
             taskConfig
         ),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 9270f5f..3886f3d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -46,12 +46,10 @@
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -589,16 +587,10 @@
   private DruidSegmentInputEntity makeInputEntity(final Interval interval)
   {
     return new DruidSegmentInputEntity(
-        new SegmentLoader()
+        new SegmentCacheManager()
         {
           @Override
-          public boolean isSegmentLoaded(DataSegment segment)
-          {
-            throw new UnsupportedOperationException("unused");
-          }
-
-          @Override
-          public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
+          public boolean isSegmentCached(DataSegment segment)
           {
             throw new UnsupportedOperationException("unused");
           }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index c05cdb8..f1ea2a0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -23,7 +23,7 @@
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
@@ -110,7 +110,7 @@
         null,
         NoopJoinableFactory.INSTANCE,
         null,
-        new SegmentLoaderFactory(null, utils.getTestObjectMapper()),
+        new SegmentCacheManagerFactory(utils.getTestObjectMapper()),
         utils.getTestObjectMapper(),
         utils.getTestIndexIO(),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 1903902..032c2fd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -54,7 +54,7 @@
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -665,7 +665,7 @@
         DirectQueryProcessingPool.INSTANCE, // query executor service
         NoopJoinableFactory.INSTANCE,
         () -> monitorScheduler, // monitor scheduler
-        new SegmentLoaderFactory(null, new DefaultObjectMapper()),
+        new SegmentCacheManagerFactory(new DefaultObjectMapper()),
         MAPPER,
         INDEX_IO,
         MapCache.create(0),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 2dd94e8..b7a489f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -26,7 +26,7 @@
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestTasks;
 import org.apache.druid.indexing.common.TestUtils;
@@ -117,7 +117,7 @@
                 null,
                 NoopJoinableFactory.INSTANCE,
                 null,
-                new SegmentLoaderFactory(null, jsonMapper),
+                new SegmentCacheManagerFactory(jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index c184509..dbc44f0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -31,7 +31,7 @@
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.IndexingServiceCondition;
-import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestRealtimeTask;
 import org.apache.druid.indexing.common.TestTasks;
@@ -190,7 +190,7 @@
                 null,
                 NoopJoinableFactory.INSTANCE,
                 null,
-                new SegmentLoaderFactory(null, jsonMapper),
+                new SegmentCacheManagerFactory(jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
index 99e38e3..ed9d1fd 100644
--- a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
+++ b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java
@@ -34,8 +34,10 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
 import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoader;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 
 import java.util.List;
 
@@ -48,7 +50,8 @@
   @Override
   public void configure(Binder binder)
   {
-    binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class);
+    binder.bind(SegmentCacheManager.class).to(SegmentLocalCacheManager.class).in(LazySingleton.class);
+    binder.bind(SegmentLoader.class).to(SegmentLocalCacheLoader.class).in(LazySingleton.class);
 
     bindDeepStorageLocal(binder);
 
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
new file mode 100644
index 0000000..9458574
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+
+/**
+ * A class to fetch segment files to local disk and manage the local cache.
+ * Implementations must be thread-safe.
+ */
+public interface SegmentCacheManager
+{
+  /**
+   * Checks whether a segment is already cached.
+   */
+  boolean isSegmentCached(DataSegment segment);
+
+  /**
+   * This method fetches the files for the given segment if the segment is not downloaded already.
+   * @throws SegmentLoadingException if there is an error in downloading files
+   */
+  File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+  /**
+   * Cleanup the cache space used by the segment
+   */
+  void cleanup(DataSegment segment);
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 741cfa1..8fe38a3 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -23,16 +23,23 @@
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
-import java.io.File;
-
 /**
- * Loading segments from deep storage to local storage.
- * Implementations must be thread-safe.
+ * Loading segments from deep storage to local storage. Internally, this class can delegate the download to
+ * {@link SegmentCacheManager}. Implementations must be thread-safe.
  */
 public interface SegmentLoader
 {
-  boolean isSegmentLoaded(DataSegment segment);
+  /**
+   * Builds a {@link Segment} by downloading if necessary
+   * @param segment - Segment to load
+   * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading
+   * @param loadFailed - Callback to invoke if lazy loading fails during column access.
+   * @throws SegmentLoadingException - If there is an error in loading the segment
+   */
   Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
-  File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
+
+  /**
+   * cleanup any state used by this segment
+   */
   void cleanup(DataSegment segment);
 }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
new file mode 100644
index 0000000..6970f7b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.inject.Inject;
+
+import java.io.File;
+import java.io.IOException;
+
+public class SegmentLocalCacheLoader implements SegmentLoader
+{
+  private final SegmentCacheManager cacheManager;
+  private final IndexIO indexIO;
+  private final ObjectMapper jsonMapper;
+
+  @Inject
+  public SegmentLocalCacheLoader(SegmentCacheManager cacheManager, IndexIO indexIO, @Json ObjectMapper mapper)
+  {
+    this.cacheManager = cacheManager;
+    this.indexIO = indexIO;
+    this.jsonMapper = mapper;
+  }
+
+  @Override
+  public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
+  {
+    final File segmentFiles = cacheManager.getSegmentFiles(segment);
+    File factoryJson = new File(segmentFiles, "factory.json");
+    final SegmentizerFactory factory;
+
+    if (factoryJson.exists()) {
+      try {
+        factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
+      }
+      catch (IOException e) {
+        throw new SegmentLoadingException(e, "%s", e.getMessage());
+      }
+    } else {
+      factory = new MMappedQueryableSegmentizerFactory(indexIO);
+    }
+
+    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
+  }
+
+  @Override
+  public void cleanup(DataSegment segment)
+  {
+    cacheManager.cleanup(segment);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
similarity index 88%
rename from server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
rename to server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index eb32ea4..412cfe9 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -26,9 +26,6 @@
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nonnull;
@@ -41,14 +38,13 @@
 
 /**
  */
-public class SegmentLoaderLocalCacheManager implements SegmentLoader
+public class SegmentLocalCacheManager implements SegmentCacheManager
 {
   @VisibleForTesting
   static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
 
-  private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
+  private static final EmittingLogger log = new EmittingLogger(SegmentLocalCacheManager.class);
 
-  private final IndexIO indexIO;
   private final SegmentLoaderConfig config;
   private final ObjectMapper jsonMapper;
 
@@ -82,18 +78,16 @@
   private final StorageLocationSelectorStrategy strategy;
 
   // Note that we only create this via injection in historical and realtime nodes. Peons create these
-  // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
+  // objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific
   // directories rather than statically configured directories.
   @Inject
-  public SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  public SegmentLocalCacheManager(
       List<StorageLocation> locations,
       SegmentLoaderConfig config,
       @Nonnull StorageLocationSelectorStrategy strategy,
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.config = config;
     this.jsonMapper = mapper;
     this.locations = locations;
@@ -102,14 +96,13 @@
   }
 
   @VisibleForTesting
-  SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  SegmentLocalCacheManager(
       SegmentLoaderConfig config,
       @Nonnull StorageLocationSelectorStrategy strategy,
       @Json ObjectMapper mapper
   )
   {
-    this(indexIO, config.toStorageLocations(), config, strategy, mapper);
+    this(config.toStorageLocations(), config, strategy, mapper);
   }
 
   /**
@@ -117,13 +110,11 @@
    *
    * This ctor is mainly for test cases, including test cases in other modules
    */
-  public SegmentLoaderLocalCacheManager(
-      IndexIO indexIO,
+  public SegmentLocalCacheManager(
       SegmentLoaderConfig config,
       @Json ObjectMapper mapper
   )
   {
-    this.indexIO = indexIO;
     this.config = config;
     this.jsonMapper = mapper;
     this.locations = config.toStorageLocations();
@@ -132,7 +123,7 @@
   }
 
   @Override
-  public boolean isSegmentLoaded(final DataSegment segment)
+  public boolean isSegmentCached(final DataSegment segment)
   {
     return findStorageLocationIfLoaded(segment) != null;
   }
@@ -177,36 +168,6 @@
     return downloadStartMarker.exists();
   }
 
-  @Override
-  public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
-  {
-    final ReferenceCountingLock lock = createOrGetLock(segment);
-    final File segmentFiles;
-    synchronized (lock) {
-      try {
-        segmentFiles = getSegmentFiles(segment);
-      }
-      finally {
-        unlock(segment, lock);
-      }
-    }
-    File factoryJson = new File(segmentFiles, "factory.json");
-    final SegmentizerFactory factory;
-
-    if (factoryJson.exists()) {
-      try {
-        factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
-      }
-      catch (IOException e) {
-        throw new SegmentLoadingException(e, "%s", e.getMessage());
-      }
-    } else {
-      factory = new MMappedQueryableSegmentizerFactory(indexIO);
-    }
-
-    return factory.factorize(segment, segmentFiles, lazy, loadFailed);
-  }
-
   /**
    * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged.
    * @param segment
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 93b16a3..486ad46 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -154,11 +154,6 @@
     return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
   }
 
-  public boolean isSegmentCached(final DataSegment segment)
-  {
-    return segmentLoader.isSegmentLoaded(segment);
-  }
-
   /**
    * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based
    * datasource of a single table.
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index bc0da93..a9d972f 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -40,6 +40,7 @@
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
@@ -88,6 +89,7 @@
   private final ScheduledExecutorService exec;
   private final ServerTypeConfig serverTypeConfig;
   private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
+  private final SegmentCacheManager segmentCacheManager;
 
   private volatile boolean started = false;
 
@@ -108,6 +110,7 @@
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
+      SegmentCacheManager segmentCacheManager,
       ServerTypeConfig serverTypeConfig
   )
   {
@@ -117,6 +120,7 @@
         announcer,
         serverAnnouncer,
         segmentManager,
+        segmentCacheManager,
         Executors.newScheduledThreadPool(
             config.getNumLoadingThreads(),
             Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
@@ -132,6 +136,7 @@
       DataSegmentAnnouncer announcer,
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
+      SegmentCacheManager segmentCacheManager,
       ScheduledExecutorService exec,
       ServerTypeConfig serverTypeConfig
   )
@@ -141,6 +146,7 @@
     this.announcer = announcer;
     this.serverAnnouncer = serverAnnouncer;
     this.segmentManager = segmentManager;
+    this.segmentCacheManager = segmentCacheManager;
     this.exec = exec;
     this.serverTypeConfig = serverTypeConfig;
 
@@ -228,7 +234,7 @@
         if (!segment.getId().toString().equals(file.getName())) {
           log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getId());
           ignored++;
-        } else if (segmentManager.isSegmentCached(segment)) {
+        } else if (segmentCacheManager.isSegmentCached(segment)) {
           cachedSegments.add(segment);
         } else {
           log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getId());
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
new file mode 100644
index 0000000..a268681
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+public class CacheTestSegmentCacheManager implements SegmentCacheManager
+{
+  private final Set<DataSegment> segmentsInTrash = new HashSet<>();
+
+  @Override
+  public boolean isSegmentCached(DataSegment segment)
+  {
+    Map<String, Object> loadSpec = segment.getLoadSpec();
+    return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
+  }
+
+  @Override
+  public File getSegmentFiles(DataSegment segment)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cleanup(DataSegment segment)
+  {
+    segmentsInTrash.add(segment);
+  }
+
+  public Set<DataSegment> getSegmentsInTrash()
+  {
+    return segmentsInTrash;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 557537c..cf47755 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment.loading;
 
-import org.apache.druid.java.util.common.MapUtils;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
@@ -28,23 +27,10 @@
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
-import java.io.File;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
 */
 public class CacheTestSegmentLoader implements SegmentLoader
 {
-  private final Set<DataSegment> segmentsInTrash = new HashSet<>();
-
-  @Override
-  public boolean isSegmentLoaded(DataSegment segment)
-  {
-    Map<String, Object> loadSpec = segment.getLoadSpec();
-    return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
-  }
 
   @Override
   public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
@@ -83,19 +69,8 @@
   }
 
   @Override
-  public File getSegmentFiles(DataSegment segment)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public void cleanup(DataSegment segment)
   {
-    segmentsInTrash.add(segment);
-  }
 
-  public Set<DataSegment> getSegmentsInTrash()
-  {
-    return segmentsInTrash;
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
similarity index 95%
rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index f617261..e812ff7 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -30,7 +30,6 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -53,7 +52,7 @@
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-public class SegmentLoaderLocalCacheManagerConcurrencyTest
+public class SegmentLocalCacheManagerConcurrencyTest
 {
   @Rule
   public final TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -66,10 +65,10 @@
   private final String segmentVersion;
 
   private File localSegmentCacheFolder;
-  private SegmentLoaderLocalCacheManager manager;
+  private SegmentLocalCacheManager manager;
   private ExecutorService executorService;
 
-  public SegmentLoaderLocalCacheManagerConcurrencyTest()
+  public SegmentLocalCacheManagerConcurrencyTest()
   {
     jsonMapper = new DefaultObjectMapper();
     jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
@@ -93,8 +92,7 @@
     final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
     locations.add(locationConfig);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
similarity index 90%
rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 75bbeff..26c9cbd 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -27,7 +27,6 @@
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -41,7 +40,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-public class SegmentLoaderLocalCacheManagerTest
+public class SegmentLocalCacheManagerTest
 {
   @Rule
   public final TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -49,9 +48,9 @@
   private final ObjectMapper jsonMapper;
 
   private File localSegmentCacheFolder;
-  private SegmentLoaderLocalCacheManager manager;
+  private SegmentLocalCacheManager manager;
 
-  public SegmentLoaderLocalCacheManagerTest()
+  public SegmentLocalCacheManagerTest()
   {
     jsonMapper = new DefaultObjectMapper();
     jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
@@ -73,8 +72,7 @@
     final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null);
     locations.add(locationConfig);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -90,10 +88,10 @@
     );
     cachedSegmentFile.mkdirs();
 
-    Assert.assertTrue("Expect cache hit", manager.isSegmentLoaded(cachedSegment));
+    Assert.assertTrue("Expect cache hit", manager.isSegmentCached(cachedSegment));
 
     final DataSegment uncachedSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D");
-    Assert.assertFalse("Expect cache miss", manager.isSegmentLoaded(uncachedSegment));
+    Assert.assertFalse("Expect cache miss", manager.isSegmentCached(uncachedSegment));
   }
 
   @Test
@@ -122,13 +120,13 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.getSegmentFiles(segmentToDownload);
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -143,8 +141,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -169,14 +166,14 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -192,8 +189,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -218,14 +214,14 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     manager.cleanup(segmentToDownload);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
   }
 
   @Test
@@ -243,8 +239,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -276,7 +271,7 @@
     }
     catch (SegmentLoadingException e) {
     }
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload));
     manager.cleanup(segmentToDownload);
   }
 
@@ -293,8 +288,7 @@
     final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null);
     locations.add(locationConfig2);
 
-    manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -319,11 +313,11 @@
     final File indexZip = new File(localSegmentFile, "index.zip");
     indexZip.createNewFile();
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
         ImmutableMap.of(
@@ -347,10 +341,10 @@
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     manager.cleanup(segmentToDownload2);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2));
   }
 
   private DataSegment dataSegmentWithInterval(String intervalStr)
@@ -402,8 +396,7 @@
       );
     }
 
-    manager = new SegmentLoaderLocalCacheManager(
-      TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
       new SegmentLoaderConfig().withLocations(locationConfigs),
       new RoundRobinStorageLocationSelectorStrategy(locations),
       jsonMapper
@@ -425,14 +418,14 @@
     // manually create a local segment under segmentSrcFolder
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload1));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload1);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload1));
 
     manager.cleanup(segmentToDownload1);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload1));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload1));
 
     // Segment 2 should be downloaded in local_storage_folder2
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
@@ -449,14 +442,14 @@
     // manually create a local segment under segmentSrcFolder
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     manager.cleanup(segmentToDownload2);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2));
 
     // Segment 3 should be downloaded in local_storage_folder3
     final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec(
@@ -476,10 +469,10 @@
 
     File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
     Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3));
 
     manager.cleanup(segmentToDownload3);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3));
 
     // Segment 4 should be downloaded in local_storage_folder again, asserting round robin distribution of segments
     final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec(
@@ -497,13 +490,13 @@
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
         ".000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4));
 
     File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
     Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4));
     manager.cleanup(segmentToDownload4);
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload4));
   }
 
   private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception
@@ -538,8 +531,7 @@
     locations.add(locationConfig2);
     locations.add(locationConfig3);
 
-    manager = new SegmentLoaderLocalCacheManager(
-      TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
       new SegmentLoaderConfig().withLocations(locations),
       jsonMapper
     );
@@ -561,11 +553,11 @@
     createLocalSegmentFile(segmentSrcFolder,
         "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Segment 2 should be downloaded in local_storage_folder2, segment2 size 5L
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 5L).withLoadSpec(
@@ -583,11 +575,11 @@
     createLocalSegmentFile(segmentSrcFolder,
         "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
 
     // Segment 3 should be downloaded in local_storage_folder3, segment3 size 20L
@@ -608,7 +600,7 @@
 
     File segmentFile3 = manager.getSegmentFiles(segmentToDownload3);
     Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3));
 
     // Now the storage locations local_storage_folder1, local_storage_folder2 and local_storage_folder3 have 10, 5 and
     // 20 bytes occupied respectively. The default strategy should pick location2 (as it has least bytes used) for the
@@ -628,11 +620,11 @@
     createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" +
         ".000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4));
 
     File segmentFile1 = manager.getSegmentFiles(segmentToDownload4);
     Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder2/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4));
 
   }
 
@@ -652,8 +644,7 @@
 
     SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig().withLocations(locationConfigs);
 
-    manager = new SegmentLoaderLocalCacheManager(
-            TestHelper.getTestIndexIO(),
+    manager = new SegmentLocalCacheManager(
             new SegmentLoaderConfig().withLocations(locationConfigs),
             new RandomStorageLocationSelectorStrategy(segmentLoaderConfig.toStorageLocations()),
             jsonMapper
@@ -677,11 +668,11 @@
     createLocalSegmentFile(segmentSrcFolder,
             "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload));
 
     File segmentFile = manager.getSegmentFiles(segmentToDownload);
     Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Segment 2 should be downloaded in local_storage_folder3, segment2 size 9L
     final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 9L).withLoadSpec(
@@ -699,11 +690,11 @@
     createLocalSegmentFile(segmentSrcFolder,
             "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0");
 
-    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2));
 
     File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
     Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder3/"));
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2));
 
 
     // Segment 3 should not be downloaded, segment3 size 20L
@@ -729,7 +720,7 @@
     }
     catch (SegmentLoadingException e) {
     }
-    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
+    Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3));
   }
 
   @Test
@@ -759,16 +750,16 @@
     Assert.assertTrue(indexZip.createNewFile());
 
     final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload);
-    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload));
 
     // Emulate a corrupted segment file
     final File downloadMarker = new File(
         cachedSegmentDir,
-        SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
+        SegmentLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
     );
     Assert.assertTrue(downloadMarker.createNewFile());
 
-    Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload));
+    Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload));
     Assert.assertFalse(cachedSegmentDir.exists());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
index 38c4100..b62d453 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java
@@ -52,8 +52,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -102,7 +103,7 @@
   private IndexIO indexIO;
   private File segmentCacheDir;
   private File segmentDeepStorageDir;
-  private SegmentLoaderLocalCacheManager segmentLoader;
+  private SegmentLocalCacheManager segmentCacheManager;
   private SegmentManager segmentManager;
   private BroadcastTableJoinableFactory joinableFactory;
 
@@ -125,8 +126,7 @@
     );
     segmentCacheDir = temporaryFolder.newFolder();
     segmentDeepStorageDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -139,7 +139,7 @@
         },
         objectMapper
     );
-    segmentManager = new SegmentManager(segmentLoader);
+    segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper));
     joinableFactory = new BroadcastTableJoinableFactory(segmentManager);
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 762339f..8698146 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -46,7 +46,6 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,15 +60,10 @@
 
 public class SegmentManagerTest
 {
+
   private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader()
   {
     @Override
-    public boolean isSegmentLoaded(DataSegment segment)
-    {
-      return false;
-    }
-
-    @Override
     public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
     {
       return new SegmentForTesting(
@@ -79,12 +73,6 @@
     }
 
     @Override
-    public File getSegmentFiles(DataSegment segment)
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void cleanup(DataSegment segment)
     {
 
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index 87587ce..5295b9d 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -40,8 +40,9 @@
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -57,6 +58,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -82,7 +84,7 @@
   private IndexIO indexIO;
   private File segmentCacheDir;
   private File segmentDeepStorageDir;
-  private SegmentLoaderLocalCacheManager segmentLoader;
+  private SegmentLocalCacheManager segmentCacheManager;
   private SegmentManager segmentManager;
   private ExecutorService exec;
 
@@ -98,8 +100,7 @@
     indexIO = new IndexIO(objectMapper, () -> 0);
     segmentCacheDir = temporaryFolder.newFolder();
     segmentDeepStorageDir = temporaryFolder.newFolder();
-    segmentLoader = new SegmentLoaderLocalCacheManager(
-        indexIO,
+    segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
           @Override
@@ -112,7 +113,7 @@
         },
         objectMapper
     );
-    segmentManager = new SegmentManager(segmentLoader);
+    segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper));
     exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d");
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
   }
@@ -137,7 +138,7 @@
     }
     Assert.assertEquals(1, segmentPuller.numFileLoaded.size());
     Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
-    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+    Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size());
   }
 
   @Test(timeout = 6000L)
@@ -168,7 +169,7 @@
     }
     Assert.assertEquals(11, segmentPuller.numFileLoaded.size());
     Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
-    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+    Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size());
   }
 
   private DataSegment createSegment(String interval) throws IOException
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
index 37cc2606..00164fc 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
@@ -35,9 +35,11 @@
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -87,9 +89,10 @@
     objectMapper = TestHelper.makeJsonMapper();
     objectMapper.registerSubtypes(TestLoadSpec.class);
     objectMapper.registerSubtypes(TestSegmentizerFactory.class);
-    SegmentManager segmentManager = new SegmentManager(new SegmentLoaderLocalCacheManager(
+    SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, objectMapper);
+    SegmentManager segmentManager = new SegmentManager(new SegmentLocalCacheLoader(
+        cacheManager,
         TestIndex.INDEX_IO,
-        config,
         objectMapper
     ));
     segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class);
@@ -99,6 +102,7 @@
         segmentAnnouncer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        cacheManager,
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 3930706..02a6db1 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -31,7 +31,9 @@
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.loading.CacheTestSegmentCacheManager;
 import org.apache.druid.segment.loading.CacheTestSegmentLoader;
+import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
@@ -80,7 +82,8 @@
   private TestStorageLocation testStorageLocation;
   private AtomicInteger announceCount;
   private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
-  private CacheTestSegmentLoader segmentLoader;
+  private CacheTestSegmentCacheManager segmentCacheManager;
+  private SegmentLoader segmentLoader;
   private SegmentManager segmentManager;
   private List<Runnable> scheduledRunnable;
   private SegmentLoaderConfig segmentLoaderConfig;
@@ -116,6 +119,7 @@
 
     scheduledRunnable = new ArrayList<>();
 
+    segmentCacheManager = new CacheTestSegmentCacheManager();
     segmentLoader = new CacheTestSegmentLoader();
     segmentManager = new SegmentManager(segmentLoader);
     segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
@@ -239,6 +243,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
@@ -273,7 +278,7 @@
     }
 
     Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
-    Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
+    Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
 
     segmentLoadDropHandler.stop();
   }
@@ -312,7 +317,7 @@
     }
 
     Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
-    Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
+    Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
 
     segmentLoadDropHandler.stop();
   }
@@ -409,6 +414,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
 
@@ -495,6 +501,7 @@
         announcer,
         Mockito.mock(DataSegmentServerAnnouncer.class),
         segmentManager,
+        segmentCacheManager,
         scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
         new ServerTypeConfig(ServerType.HISTORICAL)
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 902acd8..16832b3 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -104,7 +104,6 @@
 
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -149,12 +148,6 @@
         new SegmentLoader()
         {
           @Override
-          public boolean isSegmentLoaded(DataSegment segment)
-          {
-            return false;
-          }
-
-          @Override
           public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback)
           {
             return new SegmentForTesting(
@@ -164,12 +157,6 @@
           }
 
           @Override
-          public File getSegmentFiles(DataSegment segment)
-          {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
           public void cleanup(DataSegment segment)
           {
 
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index c30a37c..8356a26 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -28,6 +28,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
@@ -169,6 +170,7 @@
         EasyMock.createNiceMock(DataSegmentAnnouncer.class),
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         EasyMock.createNiceMock(SegmentManager.class),
+        EasyMock.createNiceMock(SegmentCacheManager.class),
         EasyMock.createNiceMock(ScheduledExecutorService.class),
         new ServerTypeConfig(ServerType.HISTORICAL)
     )
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5a78f0f..569e831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -131,6 +131,7 @@
 import org.joda.time.chrono.ISOChronology;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;