Add a new metric query/segments/count that is not emitted by default (#11394)

* Add a new metric query/segments/count that is not emitted by default

* docs

* test the default implementation of the metric

* fix spelling error in docs

* document the fact that query retries will result in additional metric emissions

* update using recommended text from @jihoonson
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index fe89751..36512ad 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -107,6 +107,7 @@
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
@@ -342,7 +343,8 @@
         processingConfig,
         forkJoinPool,
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 71cc330..d4ca7ad 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -59,6 +59,7 @@
 |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/interrupted/count`|number of queries interrupted due to cancellation.|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/timeout/count`|number of timed out queries.|This metric is only available if the QueryCountStatsMonitor module is included.||
+|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will re-send the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.|Varies.|
 |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s|
 |`sqlQuery/bytes`|number of bytes returned in SQL query response.|id, nativeQueryIds, dataSource, remoteAddress, success.| |
 
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index f1173aa..15fdfa7 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -70,6 +70,7 @@
 import org.apache.druid.server.ClientQuerySegmentWalker;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.TimelineLookup;
 import org.hamcrest.core.IsInstanceOf;
@@ -367,7 +368,8 @@
         },
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
 
     ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index a351eeb..095fa17 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -341,6 +341,13 @@
   }
 
   @Override
+  public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
+  {
+    // Don't emit by default.
+    return this;
+  }
+
+  @Override
   public void emit(ServiceEmitter emitter)
   {
     checkModifiedFromOwnerThread();
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index 9457fbd..304ed6c 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -271,6 +271,11 @@
   QueryMetrics<QueryType> reportQueryBytes(long byteCount);
 
   /**
+   * Registeres "segments queried count" metric.
+   */
+  QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount);
+
+  /**
    * Registers "wait time" metric.
    */
   QueryMetrics<QueryType> reportWaitTime(long timeNs);
diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index dd52bf5..108518a 100644
--- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -292,6 +292,12 @@
   }
 
   @Override
+  public QueryMetrics reportQueriedSegmentCount(long segmentCount)
+  {
+    return delegateQueryMetrics.reportQueriedSegmentCount(segmentCount);
+  }
+
+  @Override
   public void emit(ServiceEmitter emitter)
   {
     delegateQueryMetrics.emit(emitter);
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index e0fe0b4..5091ae0 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -148,5 +148,12 @@
     actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
     Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
     Assert.assertEquals(10L, actualEvent.get("value"));
+
+    // Here we are testing that Queried Segment Count does not get emitted by the DefaultQueryMetrics and the last
+    // metric remains as query/node/bytes
+    queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
+    actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
+    Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
+    Assert.assertEquals(10L, actualEvent.get("value"));
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 4481849..8175e2a 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -55,6 +55,7 @@
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.BySegmentResultValueClass;
 import org.apache.druid.query.CacheStrategy;
 import org.apache.druid.query.DruidProcessingConfig;
@@ -129,6 +130,7 @@
   private final ForkJoinPool pool;
   private final QueryScheduler scheduler;
   private final JoinableFactoryWrapper joinableFactoryWrapper;
+  private final ServiceEmitter emitter;
 
   @Inject
   public CachingClusteredClient(
@@ -142,7 +144,8 @@
       DruidProcessingConfig processingConfig,
       @Merging ForkJoinPool pool,
       QueryScheduler scheduler,
-      JoinableFactory joinableFactory
+      JoinableFactory joinableFactory,
+      ServiceEmitter emitter
   )
   {
     this.warehouse = warehouse;
@@ -156,6 +159,7 @@
     this.pool = pool;
     this.scheduler = scheduler;
     this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
+    this.emitter = emitter;
 
     if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
       log.warn(
@@ -369,6 +373,8 @@
 
       query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers);
       queryPlus = queryPlus.withQuery(query);
+      queryPlus = queryPlus.withQueryMetrics(toolChest);
+      queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter);
 
       final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segmentServers);
       LazySequence<T> mergedResultSequence = new LazySequence<>(() -> {
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 1d591e2..a897e09 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -51,6 +51,7 @@
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -335,7 +336,8 @@
         },
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
index ab1d6cd..218cac5 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
@@ -53,6 +53,7 @@
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.coordination.ServerManagerTest;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -138,7 +139,8 @@
         Mockito.mock(DruidProcessingConfig.class),
         ForkJoinPool.commonPool(),
         queryScheduler,
-        NoopJoinableFactory.INSTANCE
+        NoopJoinableFactory.INSTANCE,
+        new NoopServiceEmitter()
     );
 
     Query<SegmentDescriptor> fakeQuery = makeFakeQuery(interval);
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index cda0170..6fe08c4 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -125,6 +125,7 @@
 import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
 import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.DataSegment;
@@ -2850,7 +2851,8 @@
             NoQueryLaningStrategy.INSTANCE,
             new ServerConfig()
         ),
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
index ae0c269..94c6c59 100644
--- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
+++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
@@ -52,6 +52,7 @@
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.joda.time.Interval;
@@ -145,7 +146,8 @@
         ),
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
     servers = new ArrayList<>();
   }