Replace Processing ExecutorService with QueryProcessingPool (#11382)

This PR refactors the code for QueryRunnerFactory#mergeRunners to accept a new interface called QueryProcessingPool instead of ExecutorService for concurrent execution of query runners. This interface will let custom extensions inject their own implementation for deciding which query-runner to prioritize first. The default implementation is the same as today that takes the priority of query into account. QueryProcessingPool can also be used as a regular executor service. It has a dedicated method for accepting query execution work so implementations can differentiate between regular async tasks and query execution tasks. This dedicated method also passes the QueryRunner object as part of the task information. This hook will let custom extensions carry any state from QuerySegmentWalker to QueryProcessingPool#mergeRunners which is not possible currently.
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 9f15091..ca3630d 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -104,6 +104,7 @@
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
@@ -2883,7 +2884,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         this::makeTimeseriesAndScanConglomerate,
-        Execs.directExecutor(), // queryExecutorService
+        DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
         new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index ebc4464..5b3a77c 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -89,6 +89,7 @@
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -2970,7 +2971,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         this::makeTimeseriesOnlyConglomerate,
-        Execs.directExecutor(), // queryExecutorService
+        DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
         new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index bc1e00b..dbaec5a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -46,6 +46,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -73,7 +74,6 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Stuff that may be needed by a Task in order to conduct its business.
@@ -99,7 +99,7 @@
   private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
   @Nullable
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final SegmentLoader segmentLoader;
   private final ObjectMapper jsonMapper;
@@ -141,7 +141,7 @@
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentHandoffNotifierFactory handoffNotifierFactory,
       Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       @Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
       SegmentLoader segmentLoader,
@@ -180,7 +180,7 @@
     this.serverAnnouncer = serverAnnouncer;
     this.handoffNotifierFactory = handoffNotifierFactory;
     this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
-    this.queryExecutorService = queryExecutorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
     this.segmentLoader = segmentLoader;
@@ -268,9 +268,9 @@
     return queryRunnerFactoryConglomerateProvider.get();
   }
 
-  public ExecutorService getQueryExecutorService()
+  public QueryProcessingPool getQueryProcessingPool()
   {
-    return queryExecutorService;
+    return queryProcessingPool;
   }
 
   public JoinableFactory getJoinableFactory()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 5112fa9..5cd4eb5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -33,7 +33,6 @@
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Parent;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -44,6 +43,7 @@
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -62,7 +62,6 @@
 import org.apache.druid.server.security.AuthorizerMapper;
 
 import java.io.File;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Stuff that may be needed by a Task in order to conduct its business.
@@ -81,7 +80,7 @@
   private final DataSegmentServerAnnouncer serverAnnouncer;
   private final SegmentHandoffNotifierFactory handoffNotifierFactory;
   private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
   private final SegmentLoaderFactory segmentLoaderFactory;
@@ -122,7 +121,7 @@
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentHandoffNotifierFactory handoffNotifierFactory,
       Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
-      @Processing ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Provider<MonitorScheduler> monitorSchedulerProvider,
       SegmentLoaderFactory segmentLoaderFactory,
@@ -160,7 +159,7 @@
     this.serverAnnouncer = serverAnnouncer;
     this.handoffNotifierFactory = handoffNotifierFactory;
     this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
-    this.queryExecutorService = queryExecutorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
     this.segmentLoaderFactory = segmentLoaderFactory;
@@ -202,7 +201,7 @@
         serverAnnouncer,
         handoffNotifierFactory,
         queryRunnerFactoryConglomerateProvider,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         monitorSchedulerProvider,
         segmentLoaderFactory.manufacturate(taskWorkDir),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 550f818..edc1d18 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -776,7 +776,7 @@
         toolbox.getQueryRunnerFactoryConglomerate(),
         toolbox.getSegmentAnnouncer(),
         toolbox.getEmitter(),
-        toolbox.getQueryExecutorService(),
+        toolbox.getQueryProcessingPool(),
         toolbox.getJoinableFactory(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index b4c7520..6a5fc69 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -344,7 +344,7 @@
         lockingSegmentAnnouncer,
         segmentPublisher,
         toolbox.getSegmentHandoffNotifierFactory(),
-        toolbox.getQueryExecutorService(),
+        toolbox.getQueryProcessingPool(),
         toolbox.getJoinableFactory(),
         toolbox.getIndexMergerV9(),
         toolbox.getIndexIO(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d3436ea..3f91d11 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -192,7 +192,7 @@
         toolbox.getQueryRunnerFactoryConglomerate(),
         toolbox.getSegmentAnnouncer(),
         toolbox.getEmitter(),
-        toolbox.getQueryExecutorService(),
+        toolbox.getQueryProcessingPool(),
         toolbox.getJoinableFactory(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 6715569..ac0685a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -34,6 +34,7 @@
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -62,7 +63,6 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 public class TaskToolboxTest
 {
@@ -81,7 +81,7 @@
   private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
       = EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
   private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
-  private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
+  private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class);
   private ObjectMapper ObjectMapper = new ObjectMapper();
   private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
   private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
@@ -126,7 +126,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         mockHandoffNotifierFactory,
         () -> mockQueryRunnerFactoryConglomerate,
-        mockQueryExecutorService,
+        mockQueryProcessingPool,
         NoopJoinableFactory.INSTANCE,
         () -> mockMonitorScheduler,
         mockSegmentLoaderFactory,
@@ -172,9 +172,9 @@
   }
 
   @Test
-  public void testGetQueryExecutorService()
+  public void testGetQueryProcessingPool()
   {
-    Assert.assertEquals(mockQueryExecutorService, taskToolbox.build(task).getQueryExecutorService());
+    Assert.assertEquals(mockQueryProcessingPool, taskToolbox.build(task).getQueryProcessingPool());
   }
 
   @Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 113d41a..d7c5e32 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -93,6 +93,7 @@
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -1586,7 +1587,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         () -> conglomerate,
-        Execs.directExecutor(), // queryExecutorService
+        DirectQueryProcessingPool.INSTANCE, // queryExecutorService
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
         new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 965658d..f2b1503 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -79,6 +79,7 @@
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
@@ -985,7 +986,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         () -> conglomerate,
-        Execs.directExecutor(), // queryExecutorService
+        DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
         new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 6e963e6..a205751 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -25,6 +25,7 @@
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -43,8 +44,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
-import java.util.concurrent.ExecutorService;
-
 public class TestAppenderatorsManager implements AppenderatorsManager
 {
   private Appenderator realtimeAppenderator;
@@ -62,7 +61,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
@@ -83,7 +82,7 @@
         conglomerate,
         segmentAnnouncer,
         emitter,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         cache,
         cacheConfig,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index fc99375..1903902 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -94,7 +94,6 @@
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
@@ -105,6 +104,8 @@
 import org.apache.druid.java.util.metrics.MonitorScheduler;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -156,6 +157,7 @@
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -660,7 +662,7 @@
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
-        Execs.directExecutor(), // query executor service
+        DirectQueryProcessingPool.INSTANCE, // query executor service
         NoopJoinableFactory.INSTANCE,
         () -> monitorScheduler, // monitor scheduler
         new SegmentLoaderFactory(null, new DefaultObjectMapper()),
@@ -1336,7 +1338,7 @@
     final ExecutorService exec = Executors.newFixedThreadPool(8);
 
     UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
-        exec,
+        new ForwardingQueryProcessingPool(exec),
         NoopJoinableFactory.INSTANCE,
         new WorkerConfig(),
         MapCache.create(2048),
diff --git a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
index eaf244f..98c7f96 100644
--- a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
+++ b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
@@ -25,7 +25,6 @@
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulator;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -35,6 +34,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryCapacityExceededException;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -55,7 +55,6 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
@@ -91,7 +90,7 @@
   public ServerManagerForQueryErrorTest(
       QueryRunnerFactoryConglomerate conglomerate,
       ServiceEmitter emitter,
-      @Processing ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       CachePopulator cachePopulator,
       @Smile ObjectMapper objectMapper,
       Cache cache,
@@ -104,7 +103,7 @@
     super(
         conglomerate,
         emitter,
-        exec,
+        queryProcessingPool,
         cachePopulator,
         objectMapper,
         cache,
@@ -116,7 +115,7 @@
   }
 
   @Override
-  <T> QueryRunner<T> buildQueryRunnerForSegment(
+  protected <T> QueryRunner<T> buildQueryRunnerForSegment(
       Query<T> query,
       SegmentDescriptor descriptor,
       QueryRunnerFactory<T, Query<T>> factory,
diff --git a/processing/src/main/java/org/apache/druid/guice/annotations/Processing.java b/processing/src/main/java/org/apache/druid/guice/annotations/Processing.java
deleted file mode 100644
index f82691c..0000000
--- a/processing/src/main/java/org/apache/druid/guice/annotations/Processing.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice.annotations;
-
-import com.google.inject.BindingAnnotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- */
-@BindingAnnotation
-@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Processing
-{
-}
diff --git a/processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java b/processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java
new file mode 100644
index 0000000..51db7f4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+/**
+ * A helper class to avoid boilerplate for creating {@link PrioritizedQueryRunnerCallable} objects.
+ * @param <T>
+ * @param <V>
+ */
+public abstract class AbstractPrioritizedQueryRunnerCallable<T, V> implements PrioritizedQueryRunnerCallable<T, V>
+{
+  private final QueryRunner<V> runner;
+  private final int priority;
+
+  public AbstractPrioritizedQueryRunnerCallable(int priority, QueryRunner<V> runner)
+  {
+    this.priority = priority;
+    this.runner = runner;
+  }
+
+  @Override
+  public QueryRunner<V> getRunner()
+  {
+    return runner;
+  }
+
+  @Override
+  public int getPriority()
+  {
+    return priority;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
index b9c0e25..bd21034 100644
--- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
@@ -27,7 +27,6 @@
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Handles caching-related tasks for a particular query type.
@@ -42,7 +41,7 @@
    * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
    *
    * @param query            the query to be cached
-   * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
+   * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
    *                         called on the cached by-segment results
    *
    * @return true if the query is cacheable, otherwise false.
diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
index 6269493..cc393e2 100644
--- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
@@ -25,8 +25,6 @@
 import com.google.common.collect.Ordering;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -36,12 +34,10 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.context.ResponseContext;
 
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -62,28 +58,18 @@
 {
   private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
 
+  private final QueryProcessingPool queryProcessingPool;
   private final Iterable<QueryRunner<T>> queryables;
-  private final ListeningExecutorService exec;
   private final QueryWatcher queryWatcher;
 
-  public ChainedExecutionQueryRunner(
-      ExecutorService exec,
-      QueryWatcher queryWatcher,
-      QueryRunner<T>... queryables
-  )
-  {
-    this(exec, queryWatcher, Arrays.asList(queryables));
-  }
 
   public ChainedExecutionQueryRunner(
-      ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       QueryWatcher queryWatcher,
       Iterable<QueryRunner<T>> queryables
   )
   {
-    // listeningDecorator will leave PrioritizedExecutorService unchanged,
-    // since it already implements ListeningExecutorService
-    this.exec = MoreExecutors.listeningDecorator(exec);
+    this.queryProcessingPool = queryProcessingPool;
     this.queryables = Iterables.unmodifiableIterable(queryables);
     this.queryWatcher = queryWatcher;
   }
@@ -111,8 +97,8 @@
                             throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
                           }
 
-                          return exec.submit(
-                              new AbstractPrioritizedCallable<Iterable<T>>(priority)
+                          return queryProcessingPool.submitRunnerTask(
+                              new AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T>(priority, input)
                               {
                                 @Override
                                 public Iterable<T> call()
@@ -142,8 +128,7 @@
                                     throw new RuntimeException(e);
                                   }
                                 }
-                              }
-                          );
+                              });
                         }
                     )
                 );
diff --git a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
new file mode 100644
index 0000000..c24b015
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.java.util.common.concurrent.Execs;
+
+/**
+ * {@link QueryProcessingPool} wrapper over {@link Execs#directExecutor()}
+ */
+public class DirectQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool
+{
+  public static DirectQueryProcessingPool INSTANCE = new DirectQueryProcessingPool();
+
+  private DirectQueryProcessingPool()
+  {
+
+  }
+
+  @Override
+  public <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
+  {
+    return delegate().submit(task);
+  }
+
+  @Override
+  public ListeningExecutorService delegate()
+  {
+    return Execs.directExecutor();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
new file mode 100644
index 0000000..fdb9875
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Default implementation of {@link QueryProcessingPool} that just forwards operations, including query execution tasks,
+ * to an underlying {@link ExecutorService}
+ */
+public class ForwardingQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool
+{
+  private final ListeningExecutorService delegate;
+
+  public ForwardingQueryProcessingPool(ExecutorService executorService)
+  {
+    this.delegate = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  @Override
+  public <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
+  {
+    return delegate().submit(task);
+  }
+
+  @Override
+  protected ListeningExecutorService delegate()
+  {
+    return delegate;
+  }
+
+}
diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
index 85a9601..5b7d550 100644
--- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
@@ -28,8 +28,6 @@
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.data.input.Row;
@@ -51,7 +49,6 @@
 import java.util.Queue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -60,20 +57,20 @@
 {
   private static final Logger log = new Logger(GroupByMergedQueryRunner.class);
   private final Iterable<QueryRunner<T>> queryables;
-  private final ListeningExecutorService exec;
   private final Supplier<GroupByQueryConfig> configSupplier;
   private final QueryWatcher queryWatcher;
   private final NonBlockingPool<ByteBuffer> bufferPool;
+  private final QueryProcessingPool queryProcessingPool;
 
   public GroupByMergedQueryRunner(
-      ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       Supplier<GroupByQueryConfig> configSupplier,
       QueryWatcher queryWatcher,
       NonBlockingPool<ByteBuffer> bufferPool,
       Iterable<QueryRunner<T>> queryables
   )
   {
-    this.exec = MoreExecutors.listeningDecorator(exec);
+    this.queryProcessingPool = queryProcessingPool;
     this.queryWatcher = queryWatcher;
     this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
     this.configSupplier = configSupplier;
@@ -109,8 +106,8 @@
                       throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
                     }
 
-                    ListenableFuture<Void> future = exec.submit(
-                        new AbstractPrioritizedCallable<Void>(priority)
+                    ListenableFuture<Void> future = queryProcessingPool.submitRunnerTask(
+                        new AbstractPrioritizedQueryRunnerCallable<Void, T>(priority, input)
                         {
                           @Override
                           public Void call()
@@ -118,10 +115,10 @@
                             try {
                               if (bySegment) {
                                 input.run(threadSafeQueryPlus, responseContext)
-                                     .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
+                                    .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
                               } else {
                                 input.run(threadSafeQueryPlus, responseContext)
-                                     .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
+                                    .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
                               }
 
                               return null;
diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
similarity index 62%
rename from processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java
rename to processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
index 14b3560..4047d79 100644
--- a/processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java
+++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
@@ -19,53 +19,28 @@
 
 package org.apache.druid.query;
 
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
-import java.util.concurrent.Callable;
-
-public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService
+public class MetricsEmittingQueryProcessingPool extends ForwardingQueryProcessingPool
     implements ExecutorServiceMonitor.MetricEmitter
 {
-  private final ListeningExecutorService delegate;
 
-  public MetricsEmittingExecutorService(
+  public MetricsEmittingQueryProcessingPool(
       ListeningExecutorService delegate,
       ExecutorServiceMonitor executorServiceMonitor
   )
   {
-    super();
-    this.delegate = delegate;
+    super(delegate);
     executorServiceMonitor.add(this);
   }
 
   @Override
-  protected ListeningExecutorService delegate()
-  {
-    return delegate;
-  }
-
-  @SuppressWarnings("ParameterPackage")
-  @Override
-  public <T> ListenableFuture<T> submit(Callable<T> tCallable)
-  {
-    return delegate.submit(tCallable);
-  }
-
-  @Override
-  public void execute(Runnable runnable)
-  {
-    delegate.execute(runnable);
-  }
-
-  @Override
   public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder)
   {
-    if (delegate instanceof PrioritizedExecutorService) {
-      emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate).getQueueSize()));
+    if (delegate() instanceof PrioritizedExecutorService) {
+      emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate()).getQueueSize()));
     }
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java b/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java
new file mode 100644
index 0000000..6b1b554
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+/**
+ * An implementation of {@link PrioritizedCallable} that also lets caller get access to associated {@link QueryRunner}
+ * It is used in implementations of {@link QueryRunnerFactory}
+ * @param <T> - Type of result of {@link #call()} method
+ * @param <V> - Type of {@link org.apache.druid.java.util.common.guava.Sequence} of rows returned by {@link QueryRunner}
+ */
+public interface PrioritizedQueryRunnerCallable<T, V> extends PrioritizedCallable<T>
+{
+  /**
+   * This method can be used by the extensions to get the runner that the given query execution task corresponds to.
+   * That in turn can be used to fetch any state associated with the QueryRunner such as the segment info for example.
+   * Extensions can carry any state from custom implementation of QuerySegmentWalker to a
+   * custom implementation of {@link QueryProcessingPool#submitRunnerTask(PrioritizedQueryRunnerCallable)}
+   */
+  QueryRunner<V> getRunner();
+}
diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java
index fc12d5e..2a232cf 100644
--- a/processing/src/main/java/org/apache/druid/query/Query.java
+++ b/processing/src/main/java/org/apache/druid/query/Query.java
@@ -48,7 +48,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 
 @ExtensionPoint
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "queryType")
@@ -109,7 +108,7 @@
   /**
    * Comparator that represents the order in which results are generated from the
    * {@link QueryRunnerFactory#createRunner(Segment)} and
-   * {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} calls. This is used to combine streams of
+   * {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} calls. This is used to combine streams of
    * results from different sources; for example, it's used by historicals to combine streams from different segments,
    * and it's used by the broker to combine streams from different historicals.
    *
diff --git a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
new file mode 100644
index 0000000..785c31c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+
+/**
+ * This class implements the logic of how units of query execution run concurrently. It is used in {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}.
+ * In a most straightforward implementation, each unit will be submitted to an {@link PrioritizedExecutorService}. Extensions,
+ * however, can implement their own logic for picking which unit to pick first for execution.
+ * <p>
+ * This interface extends {@link ListeningExecutorService} as well. It has a separate
+ * method to submit query execution tasks so that implementations can differentiate those tasks from any regular async
+ * tasks. One example is {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)}
+ * where different kind of tasks are submitted to same processing pool.
+ * <p>
+ * Query execution task also includes a reference to {@link QueryRunner} so that any state required to decide the priority
+ * of a unit can be carried forward with the corresponding {@link QueryRunner}.
+ */
+@ExtensionPoint
+public interface QueryProcessingPool extends ListeningExecutorService
+{
+  /**
+   * Submits the query execution unit task for asynchronous execution.
+   *
+   * @param task - Task to be submitted.
+   * @param <T>  - Task result type
+   * @param <V>  - Query runner sequence type
+   * @return - Future object for tracking the task completion.
+   */
+  <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task);
+}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
index 0832fb1..add1a0a 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
@@ -44,6 +44,7 @@
   QueryRunner<T> createRunner(Segment segment);
 
   /**
+   * @deprecated Use {@link #mergeRunners(QueryProcessingPool, Iterable)} instead.
    * Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
    * along to this method with an {@link ExecutorService}.  The method should then return a {@link QueryRunner} that,
    * when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion.
@@ -56,10 +57,38 @@
    *
    * @param queryExecutor   {@link ExecutorService} to be used for parallel processing
    * @param queryRunners    Individual {@link QueryRunner} objects that produce some results
-   * @return                a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
+   * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
    *                        {@link QueryRunner} collection.
    */
-  QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);
+  @Deprecated
+  default QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners)
+  {
+    return mergeRunners(new ForwardingQueryProcessingPool(queryExecutor), queryRunners);
+  }
+
+  /**
+   * Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
+   * along to this method with an {@link QueryProcessingPool}.  The method should then return a {@link QueryRunner} that,
+   * when asked, will use the {@link QueryProcessingPool} to run the base QueryRunners in some fashion.
+   *
+   * The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}:
+   *
+   *     return new ChainedExecutionQueryRunner<>(queryProcessingPool, toolChest.getOrdering(), queryWatcher, queryRunners);
+   *
+   * Which will allow for parallel execution up to the maximum number of processing threads allowed.
+   *
+   * Unlike {@link #mergeRunners(ExecutorService, Iterable)}, this method takes a {@link QueryProcessingPool} instead
+   * which allows custom implementations for prioritize query execution on segments.
+   *
+   * @param queryProcessingPool   {@link QueryProcessingPool} to be used for parallel processing
+   * @param queryRunners    Individual {@link QueryRunner} objects that produce some results
+   * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
+   *                        {@link QueryRunner} collection.
+   */
+  QueryRunner<T> mergeRunners(
+      QueryProcessingPool queryProcessingPool,
+      Iterable<QueryRunner<T>> queryRunners
+  );
 
   /**
    * Provides access to the {@link QueryToolChest} for this specific {@link Query} type.
diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
index 73e0fab..31751fb 100644
--- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
@@ -26,6 +26,7 @@
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -36,7 +37,6 @@
 import org.apache.druid.segment.StorageAdapter;
 
 import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
 
 /**
  */
@@ -64,11 +64,11 @@
 
   @Override
   public QueryRunner<Result<DataSourceMetadataResultValue>> mergeRunners(
-      ExecutorService queryExecutor,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<Result<DataSourceMetadataResultValue>>> queryRunners
   )
   {
-    return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
+    return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
index 3fbeb25..51a7d1a 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java
@@ -20,13 +20,12 @@
 package org.apache.druid.query.groupby;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -35,8 +34,6 @@
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  *
  */
@@ -63,13 +60,10 @@
 
   @Override
   public QueryRunner<ResultRow> mergeRunners(
-      final ExecutorService exec,
+      final QueryProcessingPool queryProcessingPool,
       final Iterable<QueryRunner<ResultRow>> queryRunners
   )
   {
-    // mergeRunners should take ListeningExecutorService at some point
-    final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
-
     return new QueryRunner<ResultRow>()
     {
       @Override
@@ -77,7 +71,7 @@
       {
         QueryRunner<ResultRow> rowQueryRunner = strategySelector
             .strategize((GroupByQuery) queryPlus.getQuery())
-            .mergeRunners(queryExecutor, queryRunners);
+            .mergeRunners(queryProcessingPool, queryRunners);
         return rowQueryRunner.run(queryPlus, responseContext);
       }
     };
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index 7e8c49c..7c2eaa6 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -29,8 +29,6 @@
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.Releaser;
@@ -43,11 +41,12 @@
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.AbstractPrioritizedCallable;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryWatcher;
@@ -64,7 +63,6 @@
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -80,7 +78,7 @@
  * similarities and differences.
  *
  * Used by
- * {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(ListeningExecutorService, Iterable)}.
+ * {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)}
  */
 public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
 {
@@ -89,7 +87,7 @@
 
   private final GroupByQueryConfig config;
   private final Iterable<QueryRunner<ResultRow>> queryables;
-  private final ListeningExecutorService exec;
+  private final QueryProcessingPool queryProcessingPool;
   private final QueryWatcher queryWatcher;
   private final int concurrencyHint;
   private final BlockingPool<ByteBuffer> mergeBufferPool;
@@ -99,7 +97,7 @@
 
   public GroupByMergingQueryRunnerV2(
       GroupByQueryConfig config,
-      ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       QueryWatcher queryWatcher,
       Iterable<QueryRunner<ResultRow>> queryables,
       int concurrencyHint,
@@ -110,7 +108,7 @@
   )
   {
     this.config = config;
-    this.exec = MoreExecutors.listeningDecorator(exec);
+    this.queryProcessingPool = queryProcessingPool;
     this.queryWatcher = queryWatcher;
     this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
     this.concurrencyHint = concurrencyHint;
@@ -142,7 +140,7 @@
         .withoutThreadUnsafeState();
 
     if (QueryContexts.isBySegment(query) || forceChainedExecution) {
-      ChainedExecutionQueryRunner<ResultRow> runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables);
+      ChainedExecutionQueryRunner<ResultRow> runner = new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryables);
       return runner.run(queryPlusForRunners, responseContext);
     }
 
@@ -203,7 +201,7 @@
                       concurrencyHint,
                       temporaryStorage,
                       spillMapper,
-                      exec,
+                      queryProcessingPool, // Passed as executor service
                       priority,
                       hasTimeout,
                       timeoutAt,
@@ -229,8 +227,8 @@
                                 throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
                               }
 
-                              ListenableFuture<AggregateResult> future = exec.submit(
-                                  new AbstractPrioritizedCallable<AggregateResult>(priority)
+                              ListenableFuture<AggregateResult> future = queryProcessingPool.submitRunnerTask(
+                                  new AbstractPrioritizedQueryRunnerCallable<AggregateResult, ResultRow>(priority, input)
                                   {
                                     @Override
                                     public AggregateResult call()
@@ -244,7 +242,7 @@
                                       ) {
                                         // Return true if OK, false if resources were exhausted.
                                         return input.run(queryPlusForRunners, responseContext)
-                                                    .accumulate(AggregateResult.ok(), accumulator);
+                                            .accumulate(AggregateResult.ok(), accumulator);
                                       }
                                       catch (QueryInterruptedException | QueryTimeoutException e) {
                                         throw e;
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
index 4a892a4..df48c96 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java
@@ -19,10 +19,10 @@
 
 package org.apache.druid.query.groupby.strategy;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.context.ResponseContext;
@@ -33,8 +33,8 @@
 import org.apache.druid.segment.StorageAdapter;
 
 import javax.annotation.Nullable;
+
 import java.util.Comparator;
-import java.util.concurrent.ExecutorService;
 import java.util.function.BinaryOperator;
 
 public interface GroupByStrategy
@@ -56,7 +56,7 @@
    *
    * Used by {@link GroupByQueryQueryToolChest#getCacheStrategy(GroupByQuery)}.
    *
-   * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
+   * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
    *                         called on the cached by-segment results. Can be used to distinguish if we are running on
    *                         a broker or data node.
    *
@@ -163,18 +163,17 @@
 
   /**
    * Merge a variety of single-segment query runners into a combined runner. Used by
-   * {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(ExecutorService, Iterable)}. In
+   * {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
    * that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created
    * by that method will be fed into this method).
-   *
+   * <p>
    * This method is only called on data servers, like Historicals (not the Broker).
    *
-   * @param exec         executor service used for parallel execution of the query runners
-   * @param queryRunners collection of query runners to merge
-   *
+   * @param queryProcessingPool {@link QueryProcessingPool} service used for parallel execution of the query runners
+   * @param queryRunners  collection of query runners to merge
    * @return merged query runner
    */
-  QueryRunner<ResultRow> mergeRunners(ListeningExecutorService exec, Iterable<QueryRunner<ResultRow>> queryRunners);
+  QueryRunner<ResultRow> mergeRunners(QueryProcessingPool queryProcessingPool, Iterable<QueryRunner<ResultRow>> queryRunners);
 
   /**
    * Process a groupBy query on a single {@link StorageAdapter}. This is used by
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
index 18eaab2..72eafed 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java
@@ -25,7 +25,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.inject.Inject;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.guice.annotations.Global;
@@ -34,6 +33,7 @@
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.GroupByMergedQueryRunner;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -270,11 +270,11 @@
 
   @Override
   public QueryRunner<ResultRow> mergeRunners(
-      final ListeningExecutorService exec,
+      final QueryProcessingPool queryProcessingPool,
       final Iterable<QueryRunner<ResultRow>> queryRunners
   )
   {
-    return new GroupByMergedQueryRunner<>(exec, configSupplier, queryWatcher, bufferPool, queryRunners);
+    return new GroupByMergedQueryRunner<>(queryProcessingPool, configSupplier, queryWatcher, bufferPool, queryRunners);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 23d904a..c9d598e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -25,7 +25,6 @@
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.inject.Inject;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
@@ -47,6 +46,7 @@
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.query.ResourceLimitExceededException;
@@ -557,13 +557,13 @@
 
   @Override
   public QueryRunner<ResultRow> mergeRunners(
-      final ListeningExecutorService exec,
+      final QueryProcessingPool queryProcessingPool,
       final Iterable<QueryRunner<ResultRow>> queryRunners
   )
   {
     return new GroupByMergingQueryRunnerV2(
         configSupplier.get(),
-        exec,
+        queryProcessingPool,
         queryWatcher,
         queryRunners,
         processingConfig.getNumThreads(),
diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index 24980fe..c07ab5d 100644
--- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -21,8 +21,6 @@
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.StringUtils;
@@ -30,12 +28,13 @@
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.AbstractPrioritizedCallable;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
 import org.apache.druid.query.ConcatQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryTimeoutException;
@@ -58,7 +57,6 @@
 import java.util.TreeMap;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -186,11 +184,10 @@
 
   @Override
   public QueryRunner<SegmentAnalysis> mergeRunners(
-      ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<SegmentAnalysis>> queryRunners
   )
   {
-    final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
     return new ConcatQueryRunner<SegmentAnalysis>(
         Sequences.map(
             Sequences.simple(queryRunners),
@@ -210,8 +207,8 @@
                     final Query<SegmentAnalysis> query = queryPlus.getQuery();
                     final int priority = QueryContexts.getPriority(query);
                     final QueryPlus<SegmentAnalysis> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
-                    final ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
-                        new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
+                    ListenableFuture<Sequence<SegmentAnalysis>> future = queryProcessingPool.submitRunnerTask(
+                        new AbstractPrioritizedQueryRunnerCallable<Sequence<SegmentAnalysis>, SegmentAnalysis>(priority, input)
                         {
                           @Override
                           public Sequence<SegmentAnalysis> call()
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index a5bab7d..e58973e 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -35,6 +35,7 @@
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -54,7 +55,6 @@
 import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
@@ -83,7 +83,7 @@
 
   @Override
   public QueryRunner<ScanResultValue> mergeRunners(
-      final ExecutorService queryExecutor,
+      final QueryProcessingPool queryProcessingPool,
       final Iterable<QueryRunner<ScanResultValue>> queryRunners
   )
   {
diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java
index 060ed78..2f71e55 100644
--- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java
@@ -21,6 +21,7 @@
 
 import com.google.inject.Inject;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -28,8 +29,6 @@
 import org.apache.druid.query.Result;
 import org.apache.druid.segment.Segment;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  */
 public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
@@ -58,11 +57,11 @@
 
   @Override
   public QueryRunner<Result<SearchResultValue>> mergeRunners(
-      ExecutorService queryExecutor,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<Result<SearchResultValue>>> queryRunners
   )
   {
-    return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
+    return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index eabd80b..07775d4 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -29,6 +29,7 @@
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerHelper;
@@ -47,7 +48,6 @@
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 /**
  */
@@ -71,11 +71,11 @@
 
   @Override
   public QueryRunner<Result<TimeBoundaryResultValue>> mergeRunners(
-      ExecutorService queryExecutor,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners
   )
   {
-    return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
+    return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
index fd6651c..2d04905 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java
@@ -25,6 +25,7 @@
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -34,8 +35,6 @@
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  */
 public class TimeseriesQueryRunnerFactory
@@ -65,11 +64,11 @@
 
   @Override
   public QueryRunner<Result<TimeseriesResultValue>> mergeRunners(
-      ExecutorService queryExecutor,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<Result<TimeseriesResultValue>>> queryRunners
   )
   {
-    return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
+    return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java
index 7044afc..821d37e 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java
@@ -26,6 +26,7 @@
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
@@ -35,7 +36,6 @@
 import org.apache.druid.segment.Segment;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
 
 /**
  */
@@ -82,11 +82,11 @@
 
   @Override
   public QueryRunner<Result<TopNResultValue>> mergeRunners(
-      ExecutorService queryExecutor,
+      QueryProcessingPool queryProcessingPool,
       Iterable<QueryRunner<Result<TopNResultValue>>> queryRunners
   )
   {
-    return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
+    return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
   }
 
   @Override
diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
index cf82292..b1b5084 100644
--- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -29,14 +30,20 @@
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -47,6 +54,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 public class ChainedExecutionQueryRunnerTest
 {
@@ -112,10 +120,10 @@
     );
 
     ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
-        exec,
+        new ForwardingQueryProcessingPool(exec),
         watcher,
         Lists.newArrayList(
-         runners
+            runners
         )
     );
     TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@@ -236,7 +244,7 @@
     );
 
     ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
-        exec,
+        new ForwardingQueryProcessingPool(exec),
         watcher,
         Lists.newArrayList(
             runners
@@ -306,6 +314,35 @@
     EasyMock.verify(watcher);
   }
 
+  @Test
+  public void testSubmittedTaskType()
+  {
+    QueryProcessingPool queryProcessingPool = Mockito.mock(QueryProcessingPool.class);
+    QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+        .dataSource("test")
+        .intervals("2014/2015")
+        .aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
+        .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test"))
+        .build();
+    List<QueryRunner<Result<TimeseriesResultValue>>> runners = Arrays.asList(
+        Mockito.mock(QueryRunner.class),
+        Mockito.mock(QueryRunner.class)
+    );
+    ChainedExecutionQueryRunner<Result<TimeseriesResultValue>> chainedRunner = new ChainedExecutionQueryRunner<>(
+        queryProcessingPool,
+        watcher,
+        runners
+    );
+
+    Mockito.when(queryProcessingPool.submitRunnerTask(ArgumentMatchers.any())).thenReturn(Futures.immediateFuture(Collections.singletonList(123)));
+    chainedRunner.run(QueryPlus.wrap(query)).toList();
+    ArgumentCaptor<PrioritizedQueryRunnerCallable> captor = ArgumentCaptor.forClass(PrioritizedQueryRunnerCallable.class);
+    Mockito.verify(queryProcessingPool, Mockito.times(2)).submitRunnerTask(captor.capture());
+    List<QueryRunner> actual = captor.getAllValues().stream().map(PrioritizedQueryRunnerCallable::getRunner).collect(Collectors.toList());
+    Assert.assertEquals(runners, actual);
+  }
+
   private class DyingQueryRunner implements QueryRunner<Integer>
   {
     private final CountDownLatch start;
diff --git a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
new file mode 100644
index 0000000..71b3a1c
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MetricsEmittingQueryProcessingPoolTest
+{
+  @Test
+  public void testPrioritizedExecutorDelegate()
+  {
+    PrioritizedExecutorService service = Mockito.mock(PrioritizedExecutorService.class);
+    Mockito.when(service.getQueueSize()).thenReturn(10);
+    ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
+    List<Event> events = new ArrayList<>();
+    MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
+    Assert.assertSame(service, processingPool.delegate());
+
+    ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", Mockito.mock(Emitter.class))
+    {
+      @Override
+      public void emit(Event event)
+      {
+        events.add(event);
+      }
+    };
+    monitor.doMonitor(serviceEmitter);
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending");
+    Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10);
+  }
+
+  @Test
+  public void testNonPrioritizedExecutorDelegate()
+  {
+    ListeningExecutorService service = Mockito.mock(ListeningExecutorService.class);
+    ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
+    MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
+    Assert.assertSame(service, processingPool.delegate());
+
+    ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
+    monitor.doMonitor(serviceEmitter);
+    Mockito.verifyNoInteractions(serviceEmitter);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index d6c2ca8..2d73c71 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -52,6 +52,7 @@
 import org.apache.druid.query.BySegmentResultValue;
 import org.apache.druid.query.BySegmentResultValueClass;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.QueryContexts;
@@ -10960,7 +10961,7 @@
     );
 
     ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner(
-        Execs.directExecutor(),
+        DirectQueryProcessingPool.INSTANCE,
         (query1, future) -> {
           return;
         },
diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 5970016..b02ae36 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -36,7 +36,6 @@
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
@@ -45,8 +44,9 @@
 import org.apache.druid.offheap.OffheapBufferGenerator;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ExecutorServiceMonitor;
-import org.apache.druid.query.MetricsEmittingExecutorService;
+import org.apache.druid.query.MetricsEmittingQueryProcessingPool;
 import org.apache.druid.query.PrioritizedExecutorService;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.server.metrics.MetricsModule;
 import org.apache.druid.utils.JvmUtils;
 
@@ -93,15 +93,14 @@
   }
 
   @Provides
-  @Processing
   @ManageLifecycle
-  public ExecutorService getProcessingExecutorService(
+  public QueryProcessingPool getProcessingExecutorPool(
       DruidProcessingConfig config,
       ExecutorServiceMonitor executorServiceMonitor,
       Lifecycle lifecycle
   )
   {
-    return new MetricsEmittingExecutorService(
+    return new MetricsEmittingQueryProcessingPool(
         PrioritizedExecutorService.create(
             lifecycle,
             config
diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index fdce8e8..59af28a 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -28,16 +28,16 @@
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.server.metrics.MetricsModule;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
 
 /**
  * This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and
@@ -58,14 +58,13 @@
   }
 
   @Provides
-  @Processing
   @ManageLifecycle
-  public ExecutorService getProcessingExecutorService(DruidProcessingConfig config)
+  public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config)
   {
     if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) {
       log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured());
     }
-    return Execs.dummy();
+    return new ForwardingQueryProcessingPool(Execs.dummy());
   }
 
   @Provides
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index ff8799d..6a86f3e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -25,6 +25,7 @@
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
@@ -38,8 +39,6 @@
 import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 
-import java.util.concurrent.ExecutorService;
-
 public class Appenderators
 {
   public static Appenderator createRealtime(
@@ -54,7 +53,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
@@ -79,7 +78,7 @@
             objectMapper,
             emitter,
             conglomerate,
-            queryExecutorService,
+            queryProcessingPool,
             joinableFactory,
             Preconditions.checkNotNull(cache, "cache"),
             cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index c5b22cf..0bbdd40 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -25,6 +25,7 @@
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -39,8 +40,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * This interface defines entities that create and manage potentially multiple {@link Appenderator} instances.
  *
@@ -76,7 +75,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index 6ee6b4b..d99aa81 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -25,8 +25,8 @@
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.guice.annotations.Json;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
@@ -42,14 +42,13 @@
 import org.apache.druid.timeline.partition.ShardSpec;
 
 import java.io.File;
-import java.util.concurrent.ExecutorService;
 
 public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
 {
   private final ServiceEmitter emitter;
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final DataSegmentAnnouncer segmentAnnouncer;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final DataSegmentPusher dataSegmentPusher;
   private final ObjectMapper jsonMapper;
@@ -63,7 +62,7 @@
       @JacksonInject ServiceEmitter emitter,
       @JacksonInject QueryRunnerFactoryConglomerate conglomerate,
       @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
-      @JacksonInject @Processing ExecutorService queryExecutorService,
+      @JacksonInject QueryProcessingPool queryProcessingPool,
       @JacksonInject JoinableFactory joinableFactory,
       @JacksonInject DataSegmentPusher dataSegmentPusher,
       @JacksonInject @Json ObjectMapper jsonMapper,
@@ -77,7 +76,7 @@
     this.emitter = emitter;
     this.conglomerate = conglomerate;
     this.segmentAnnouncer = segmentAnnouncer;
-    this.queryExecutorService = queryExecutorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.dataSegmentPusher = dataSegmentPusher;
     this.jsonMapper = jsonMapper;
@@ -114,7 +113,7 @@
         conglomerate,
         segmentAnnouncer,
         emitter,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         cache,
         cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index f1e2f3c..35b0884 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -26,6 +26,7 @@
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -40,8 +41,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * This implementation is needed because Overlords and MiddleManagers operate on Task objects which
  * can require an AppenderatorsManager to be injected.
@@ -67,7 +66,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 88a4f57..323122a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -26,6 +26,7 @@
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -40,8 +41,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * Manages Appenderators for tasks running within a CliPeon process.
  *
@@ -73,7 +72,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
@@ -99,7 +98,7 @@
           conglomerate,
           segmentAnnouncer,
           emitter,
-          queryExecutorService,
+          queryProcessingPool,
           joinableFactory,
           cache,
           cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 6097857..da626c1 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -32,19 +32,20 @@
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.BySegmentQueryRunner;
 import org.apache.druid.query.CPUTimeMetricQueryRunner;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.MetricsEmittingQueryRunner;
 import org.apache.druid.query.NoopQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -71,7 +72,6 @@
 
 import java.io.Closeable;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
@@ -89,7 +89,7 @@
   private final ObjectMapper objectMapper;
   private final ServiceEmitter emitter;
   private final QueryRunnerFactoryConglomerate conglomerate;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactoryWrapper joinableFactoryWrapper;
   private final Cache cache;
   private final CacheConfig cacheConfig;
@@ -101,7 +101,7 @@
       ObjectMapper objectMapper,
       ServiceEmitter emitter,
       QueryRunnerFactoryConglomerate conglomerate,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
@@ -113,7 +113,7 @@
     this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
     this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
-    this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
+    this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
     this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
     this.cache = Preconditions.checkNotNull(cache, "cache");
     this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
@@ -272,7 +272,7 @@
                       sinkSegmentId,
                       descriptor.getInterval().getStart(),
                       factory.mergeRunners(
-                          Execs.directExecutor(),
+                          DirectQueryProcessingPool.INSTANCE,
                           perHydrantRunners
                       )
                   ),
@@ -287,7 +287,7 @@
     final QueryRunner<T> mergedRunner =
         toolChest.mergeResults(
             factory.mergeRunners(
-                queryExecutorService,
+                queryProcessingPool,
                 perSegmentRunners
             )
         );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 3780c37..faba506 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -31,7 +31,6 @@
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.IAE;
@@ -41,6 +40,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -76,7 +76,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in
@@ -108,7 +107,7 @@
 
   private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<>();
 
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final WorkerConfig workerConfig;
   private final Cache cache;
@@ -122,7 +121,7 @@
 
   @Inject
   public UnifiedIndexerAppenderatorsManager(
-      @Processing ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       WorkerConfig workerConfig,
       Cache cache,
@@ -133,7 +132,7 @@
       Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider
   )
   {
-    this.queryExecutorService = queryExecutorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.workerConfig = workerConfig;
     this.cache = cache;
@@ -161,7 +160,7 @@
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
       ServiceEmitter emitter,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Cache cache,
       CacheConfig cacheConfig,
@@ -347,7 +346,7 @@
           objectMapper,
           serviceEmitter,
           queryRunnerFactoryConglomerateProvider.get(),
-          queryExecutorService,
+          queryProcessingPool,
           joinableFactory,
           Preconditions.checkNotNull(cache, "cache"),
           cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
index 20d4b75..0b2ce80 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -31,6 +31,7 @@
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
@@ -46,7 +47,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -70,7 +70,7 @@
       ServiceEmitter emitter,
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       IndexMerger indexMerger,
       IndexIO indexIO,
@@ -88,7 +88,7 @@
         emitter,
         conglomerate,
         segmentAnnouncer,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         null,
         null,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java
index da827bb..787ea26 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java
@@ -27,8 +27,8 @@
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -39,8 +39,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Duration;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run
  * a real time node without the rest of the Druid cluster.
@@ -54,7 +52,7 @@
   private final ServiceEmitter emitter;
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final DataSegmentAnnouncer segmentAnnouncer;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final IndexMergerV9 indexMergerV9;
   private final IndexIO indexIO;
@@ -69,7 +67,7 @@
       @JacksonInject ServiceEmitter emitter,
       @JacksonInject QueryRunnerFactoryConglomerate conglomerate,
       @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
-      @JacksonInject @Processing ExecutorService queryExecutorService,
+      @JacksonInject QueryProcessingPool queryProcessingPool,
       @JacksonInject JoinableFactory joinableFactory,
       @JacksonInject IndexMergerV9 indexMergerV9,
       @JacksonInject IndexIO indexIO,
@@ -86,7 +84,7 @@
         segmentAnnouncer,
         null,
         null,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         indexMergerV9,
         indexIO,
@@ -100,7 +98,7 @@
     this.emitter = emitter;
     this.conglomerate = conglomerate;
     this.segmentAnnouncer = segmentAnnouncer;
-    this.queryExecutorService = queryExecutorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@@ -127,7 +125,7 @@
         emitter,
         conglomerate,
         segmentAnnouncer,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         indexMergerV9,
         indexIO,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index 0c97d85..ca0484c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -47,6 +47,7 @@
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QuerySegmentWalker;
@@ -138,7 +139,7 @@
       ServiceEmitter emitter,
       QueryRunnerFactoryConglomerate conglomerate,
       DataSegmentAnnouncer segmentAnnouncer,
-      ExecutorService queryExecutorService,
+      QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       DataSegmentPusher dataSegmentPusher,
       SegmentPublisher segmentPublisher,
@@ -168,7 +169,7 @@
         objectMapper,
         emitter,
         conglomerate,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         cache,
         cacheConfig,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index 3f808f9..e2ba02c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -26,8 +26,8 @@
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -40,8 +40,6 @@
 import org.apache.druid.segment.realtime.SegmentPublisher;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  *
  */
@@ -53,7 +51,7 @@
   private final DataSegmentAnnouncer segmentAnnouncer;
   private final SegmentPublisher segmentPublisher;
   private final SegmentHandoffNotifierFactory handoffNotifierFactory;
-  private final ExecutorService queryExecutorService;
+  private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final IndexMergerV9 indexMergerV9;
   private final IndexIO indexIO;
@@ -70,7 +68,7 @@
       @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
       @JacksonInject SegmentPublisher segmentPublisher,
       @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
-      @JacksonInject @Processing ExecutorService executorService,
+      @JacksonInject QueryProcessingPool queryProcessingPool,
       @JacksonInject JoinableFactory joinableFactory,
       @JacksonInject IndexMergerV9 indexMergerV9,
       @JacksonInject IndexIO indexIO,
@@ -86,7 +84,7 @@
     this.segmentAnnouncer = segmentAnnouncer;
     this.segmentPublisher = segmentPublisher;
     this.handoffNotifierFactory = handoffNotifierFactory;
-    this.queryExecutorService = executorService;
+    this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@@ -113,7 +111,7 @@
         emitter,
         conglomerate,
         segmentAnnouncer,
-        queryExecutorService,
+        queryProcessingPool,
         joinableFactory,
         dataSegmentPusher,
         segmentPublisher,
diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index 5ae9700..5c71d84 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -22,9 +22,9 @@
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.FluentQueryRunnerBuilder;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -104,7 +104,7 @@
 
     final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
     final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
-        Execs.directExecutor(),
+        DirectQueryProcessingPool.INSTANCE,
         () -> StreamSupport.stream(segments.spliterator(), false)
                            .map(segmentMapFn)
                            .map(queryRunnerFactory::createRunner).iterator()
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index ffdb38d..7f2b811 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -26,7 +26,6 @@
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulator;
-import org.apache.druid.guice.annotations.Processing;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -44,6 +43,7 @@
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -72,7 +72,6 @@
 
 import java.util.Collections;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
@@ -86,7 +85,7 @@
   private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final ServiceEmitter emitter;
-  private final ExecutorService exec;
+  private final QueryProcessingPool queryProcessingPool;
   private final CachePopulator cachePopulator;
   private final Cache cache;
   private final ObjectMapper objectMapper;
@@ -99,7 +98,7 @@
   public ServerManager(
       QueryRunnerFactoryConglomerate conglomerate,
       ServiceEmitter emitter,
-      @Processing ExecutorService exec,
+      QueryProcessingPool queryProcessingPool,
       CachePopulator cachePopulator,
       @Smile ObjectMapper objectMapper,
       Cache cache,
@@ -112,7 +111,7 @@
     this.conglomerate = conglomerate;
     this.emitter = emitter;
 
-    this.exec = exec;
+    this.queryProcessingPool = queryProcessingPool;
     this.cachePopulator = cachePopulator;
     this.cache = cache;
     this.objectMapper = objectMapper;
@@ -229,7 +228,7 @@
 
     return CPUTimeMetricQueryRunner.safeBuild(
         new FinalizeResultsQueryRunner<>(
-            toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
+            toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners)),
             toolChest
         ),
         toolChest,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 600662e..4940659 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -37,6 +37,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -283,7 +284,7 @@
         ),
         new NoopDataSegmentAnnouncer(),
         emitter,
-        queryExecutor,
+        new ForwardingQueryProcessingPool(queryExecutor),
         NoopJoinableFactory.INSTANCE,
         MapCache.create(2048),
         new CacheConfig(),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index 728e980..daa14ca 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -26,9 +26,9 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -56,7 +56,7 @@
   public final ExpectedException expectedException = ExpectedException.none();
 
   private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager(
-      Execs.directExecutor(),
+      DirectQueryProcessingPool.INSTANCE,
       NoopJoinableFactory.INSTANCE,
       new WorkerConfig(),
       MapCache.create(10),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 0039690..80f5461 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -37,10 +37,10 @@
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.QueryableIndex;
@@ -230,7 +230,7 @@
         announcer,
         segmentPublisher,
         handoffNotifierFactory,
-        Execs.directExecutor(),
+        DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
         TestHelper.getTestIndexIO(),
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 6f8d5a6..902acd8 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -46,10 +46,12 @@
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.DefaultQueryMetrics;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.NoopQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -101,6 +103,7 @@
 import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -187,7 +190,7 @@
           }
         },
         new NoopServiceEmitter(),
-        serverManagerExec,
+        new ForwardingQueryProcessingPool(serverManagerExec),
         new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1),
         new DefaultObjectMapper(),
         new LocalCacheProvider().get(),
@@ -737,7 +740,7 @@
 
     @Override
     public QueryRunner<Result<SearchResultValue>> mergeRunners(
-        ExecutorService queryExecutor,
+        QueryProcessingPool queryProcessingPool,
         Iterable<QueryRunner<Result<SearchResultValue>>> queryRunners
     )
     {
diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
index eab178a..a9aa9c4 100644
--- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java
+++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -45,11 +46,11 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DirectQueryProcessingPool;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
@@ -477,14 +478,15 @@
     );
   }
 
-  private static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
+  @VisibleForTesting
+  static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
   {
     final QueryRunnerFactoryConglomerate conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class);
     final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
     final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment(index, SegmentId.dummy("segment")));
     return factory
         .getToolchest()
-        .mergeResults(factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)))
+        .mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner)))
         .run(QueryPlus.wrap(query), ResponseContext.createEmpty());
   }
 
diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
new file mode 100644
index 0000000..e22c7a3
--- /dev/null
+++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class DumpSegmentTest
+{
+  @Test
+  public void testExecuteQuery()
+  {
+    Injector injector = Mockito.mock(Injector.class);
+    QueryRunnerFactoryConglomerate conglomerate = Mockito.mock(QueryRunnerFactoryConglomerate.class);
+    QueryRunnerFactory factory = Mockito.mock(QueryRunnerFactory.class, Mockito.RETURNS_DEEP_STUBS);
+    QueryRunner runner = Mockito.mock(QueryRunner.class);
+    QueryRunner mergeRunner = Mockito.mock(QueryRunner.class);
+    Query query = Mockito.mock(Query.class);
+    Sequence expected = Sequences.simple(Collections.singletonList(123));
+    Mockito.when(injector.getInstance(QueryRunnerFactoryConglomerate.class)).thenReturn(conglomerate);
+    Mockito.when(conglomerate.findFactory(ArgumentMatchers.any())).thenReturn(factory);
+    Mockito.when(factory.createRunner(ArgumentMatchers.any())).thenReturn(runner);
+    Mockito.when(factory.getToolchest().mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner)))).thenReturn(mergeRunner);
+    Mockito.when(mergeRunner.run(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(expected);
+    Sequence actual = DumpSegment.executeQuery(injector, null, query);
+    Assert.assertSame(expected, actual);
+  }
+}