RATIS-1998. Add watch request metrics (#1009)

diff --git a/ratis-docs/src/site/markdown/metrics.md b/ratis-docs/src/site/markdown/metrics.md
index 10c78cc..2348679 100644
--- a/ratis-docs/src/site/markdown/metrics.md
+++ b/ratis-docs/src/site/markdown/metrics.md
@@ -80,31 +80,34 @@
 
 ### Raft Server Metrics
 
-| Application | Component | Name                             | Type    | Description                                                         |
-|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
-| ratis       | server    | {peer}_lastHeartbeatElapsedTime  | Gauge   | Time elapsed since last heartbeat rpc response                      |
-| ratis       | server    | follower_append_entry_latency    | Timer   | Time taken for followers to append log entries                      |
-| ratis       | server    | {peer}_peerCommitIndex           | Gauge   | Commit index of peer                                                |
-| ratis       | server    | clientReadRequest                | Timer   | Time taken to process read requests from client                     |
-| ratis       | server    | clientStaleReadRequest           | Timer   | Time taken to process stale-read requests from client               |
-| ratis       | server    | clientWriteRequest               | Timer   | Time taken to process write requests from client                    |
-| ratis       | server    | clientWatch{level}Request        | Timer   | Time taken to process watch(replication_level) requests from client |
-| ratis       | server    | numRequestQueueLimitHits         | Counter | Number of (total client requests in queue) limit hits               |
-| ratis       | server    | numRequestsByteSizeLimitHits     | Counter | Number of (total size of client requests in queue) limit hits       |
-| ratis       | server    | numResourceLimitHits             | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits    |
-| ratis       | server    | numPendingRequestInQueue         | Gauge   | Number of pending client requests in queue                          |
-| ratis       | server    | numPendingRequestMegaByteSize    | Gauge   | Total size of pending client requests in queue                      |
-| ratis       | server    | retryCacheEntryCount             | Gauge   | Number of entries in retry cache                                    |
-| ratis       | server    | retryCacheHitCount               | Gauge   | Number of retry cache hits                                          |
-| ratis       | server    | retryCacheHitRate                | Gauge   | Retry cache hit rate                                                |
-| ratis       | server    | retryCacheMissCount              | Gauge   | Number of retry cache misses                                        |
-| ratis       | server    | retryCacheMissRate               | Gauge   | Retry cache miss rate                                               |
-| ratis       | server    | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests                                |
-| ratis       | server    | numFailedClientReadOnServer      | Counter | Number of failed read requests                                      |
-| ratis       | server    | numFailedClientWriteOnServer     | Counter | Number of failed write requests                                     |
-| ratis       | server    | numFailedClientWatchOnServer     | Counter | Number of failed watch requests                                     |
-| ratis       | server    | numFailedClientStreamOnServer    | Counter | Number of failed stream requests                                    |
-| ratis       | server    | numInstallSnapshot               | Counter | Number of install-snapshot requests                                 |
+| Application | Component | Name                                 | Type    | Description                                                         |
+|-------------|-----------|--------------------------------------|---------|---------------------------------------------------------------------|
+| ratis       | server    | {peer}_lastHeartbeatElapsedTime      | Gauge   | Time elapsed since last heartbeat rpc response                      |
+| ratis       | server    | follower_append_entry_latency        | Timer   | Time taken for followers to append log entries                      |
+| ratis       | server    | {peer}_peerCommitIndex               | Gauge   | Commit index of peer                                                |
+| ratis       | server    | clientReadRequest                    | Timer   | Time taken to process read requests from client                     |
+| ratis       | server    | clientStaleReadRequest               | Timer   | Time taken to process stale-read requests from client               |
+| ratis       | server    | clientWriteRequest                   | Timer   | Time taken to process write requests from client                    |
+| ratis       | server    | clientWatch{level}Request            | Timer   | Time taken to process watch(replication_level) requests from client |
+| ratis       | server    | numRequestQueueLimitHits             | Counter | Number of (total client requests in queue) limit hits               |
+| ratis       | server    | numRequestsByteSizeLimitHits         | Counter | Number of (total size of client requests in queue) limit hits       |
+| ratis       | server    | numResourceLimitHits                 | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits    |
+| ratis       | server    | numPendingRequestInQueue             | Gauge   | Number of pending client requests in queue                          |
+| ratis       | server    | numPendingRequestMegaByteSize        | Gauge   | Total size of pending client requests in queue                      |
+| ratis       | server    | retryCacheEntryCount                 | Gauge   | Number of entries in retry cache                                    |
+| ratis       | server    | retryCacheHitCount                   | Gauge   | Number of retry cache hits                                          |
+| ratis       | server    | retryCacheHitRate                    | Gauge   | Retry cache hit rate                                                |
+| ratis       | server    | retryCacheMissCount                  | Gauge   | Number of retry cache misses                                        |
+| ratis       | server    | retryCacheMissRate                   | Gauge   | Retry cache miss rate                                               |
+| ratis       | server    | numFailedClientStaleReadOnServer     | Counter | Number of failed stale-read requests                                |
+| ratis       | server    | numFailedClientReadOnServer          | Counter | Number of failed read requests                                      |
+| ratis       | server    | numFailedClientWriteOnServer         | Counter | Number of failed write requests                                     |
+| ratis       | server    | numFailedClientWatchOnServer         | Counter | Number of failed watch requests                                     |
+| ratis       | server    | numFailedClientStreamOnServer        | Counter | Number of failed stream requests                                    |
+| ratis       | server    | numInstallSnapshot                   | Counter | Number of install-snapshot requests                                 |
+| ratis       | server    | numWatch{level}RequestTimeout        | Counter | Number of watch(replication_level) request timeout                  |
+| ratis       | server    | numWatch{level}RequestInQueue        | Gauge   | Number of watch(replication_level) requests in queue                |
+| ratis       | server    | numWatch{level}RequestQueueLimitHits | Counter | Number of (total watch request in queue) limit hits                 |
 
 
 ## Ratis Netty Metrics
diff --git a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
index 32bcf52..eafc384 100644
--- a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
+++ b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
@@ -49,6 +49,13 @@
     return Collections.unmodifiableMap(maps);
   }
 
+  protected static <T extends Enum<T>> Map<T, LongCounter> newCounterMap(
+      Class<T> clazz, Function<T, LongCounter> constructor) {
+    final EnumMap<T, LongCounter> map = new EnumMap<>(clazz);
+    Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, constructor.apply(t)));
+    return Collections.unmodifiableMap(map);
+  }
+
   protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
       Class<T> clazz, Function<T, Timekeeper> constructor) {
     final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8864c22..4175424 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -379,7 +379,7 @@
     raftServerMetrics = server.getRaftServerMetrics();
     logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
     this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
-    this.watchRequests = new WatchRequests(server.getMemberId(), properties);
+    this.watchRequests = new WatchRequests(server.getMemberId(), properties, raftServerMetrics);
     this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
     this.pendingStepDown = new PendingStepDown(this);
     this.readIndexHeartbeats = new ReadIndexHeartbeats();
@@ -457,6 +457,7 @@
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
     pendingRequests.close();
+    watchRequests.close();
     return f;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index f4c6200..6988bfb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -24,6 +24,7 @@
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,11 +76,15 @@
     private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
         Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
     private final ResourceSemaphore resource;
+    private final RaftServerMetricsImpl raftServerMetrics;
     private volatile long index; //Invariant: q.isEmpty() or index < any element q
 
-    WatchQueue(ReplicationLevel replication, int elementLimit) {
+    WatchQueue(ReplicationLevel replication, int elementLimit, RaftServerMetricsImpl raftServerMetrics) {
       this.replication = replication;
       this.resource = new ResourceSemaphore(elementLimit);
+      this.raftServerMetrics = raftServerMetrics;
+
+      raftServerMetrics.addNumPendingWatchRequestsGauge(resource::used, replication);
     }
 
     long getIndex() {
@@ -103,6 +108,7 @@
 
       if (computed == null) {
         // failed to acquire
+        raftServerMetrics.onWatchRequestQueueLimitHit(replication);
         return JavaUtils.completeExceptionally(new ResourceUnavailableException(
             "Failed to acquire a pending watch request in " + name + " for " + request));
       }
@@ -123,6 +129,7 @@
         pending.getFuture().completeExceptionally(
             new NotReplicatedException(request.getCallId(), replication, pending.getIndex()));
         LOG.debug("{}: timeout {}, {}", name, pending, request);
+        raftServerMetrics.onWatchRequestTimeout(replication);
       }
     }
 
@@ -162,6 +169,12 @@
       q.clear();
       resource.close();
     }
+
+    void close() {
+      if (raftServerMetrics != null) {
+        raftServerMetrics.removeNumPendingWatchRequestsGauge(replication);
+      }
+    }
   }
 
   private final String name;
@@ -171,7 +184,7 @@
   private final TimeDuration watchTimeoutDenominationNanos;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
-  WatchRequests(Object name, RaftProperties properties) {
+  WatchRequests(Object name, RaftProperties properties, RaftServerMetricsImpl raftServerMetrics) {
     this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
 
     final TimeDuration watchTimeout = RaftServerConfigKeys.Watch.timeout(properties);
@@ -183,7 +196,8 @@
             + watchTimeoutDenomination + ").");
 
     final int elementLimit = RaftServerConfigKeys.Watch.elementLimit(properties);
-    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r, elementLimit)));
+    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r,
+        new WatchQueue(r, elementLimit, raftServerMetrics)));
   }
 
   CompletableFuture<Long> add(RaftClientRequest request) {
@@ -207,4 +221,8 @@
   void failWatches(Exception e) {
     queues.values().forEach(q -> q.failAll(e));
   }
+
+  void close() {
+    queues.values().forEach(WatchQueue::close);
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index cdbce6e..70711c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -57,9 +57,14 @@
   public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numRequestQueueLimitHits";
   public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
   public static final String RESOURCE_LIMIT_HIT_COUNTER = "numResourceLimitHits";
+  public static final String WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numWatch%sRequestQueueLimitHits";
 
   public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
   public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";
+
+  public static final String WATCH_REQUEST_QUEUE_SIZE = "numWatch%sRequestInQueue";
+  public static final String WATCH_REQUEST_TIMEOUT_COUNTER = "numWatch%sRequestTimeout";
+
   public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
   public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
   public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -76,6 +81,11 @@
   private final LongCounter numRequestQueueLimitHits = getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER);
   private final LongCounter numRequestsByteSizeLimitHits = getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER);
   private final LongCounter numResourceLimitHits = getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER);
+  private final Map<ReplicationLevel, LongCounter> numWatchRequestQueueLimitHits = newCounterMap(ReplicationLevel.class,
+      replication -> getRegistry().counter(
+          String.format(WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER, Type.toString(replication))));
+  private final Map<ReplicationLevel, LongCounter> numWatchRequestsTimeout = newCounterMap(ReplicationLevel.class,
+      replication -> getRegistry().counter(String.format(WATCH_REQUEST_TIMEOUT_COUNTER, Type.toString(replication))));
 
   private final LongCounter numFailedClientStaleRead
       = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT);
@@ -150,6 +160,14 @@
     return numInstallSnapshot;
   }
 
+  public LongCounter getNumWatchRequestQueueLimitHits(ReplicationLevel replication) {
+    return numWatchRequestQueueLimitHits.get(replication);
+  }
+
+  public LongCounter getNumWatchRequestsTimeout(ReplicationLevel replication) {
+    return numWatchRequestsTimeout.get(replication);
+  }
+
   private static RatisMetricRegistry createRegistry(String serverId) {
     return create(new MetricRegistryInfo(serverId,
         RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
@@ -237,6 +255,22 @@
     return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE);
   }
 
+  public void onWatchRequestQueueLimitHit(ReplicationLevel replicationLevel) {
+    numWatchRequestQueueLimitHits.get(replicationLevel).inc();
+  }
+
+  public void onWatchRequestTimeout(ReplicationLevel replicationLevel) {
+    numWatchRequestsTimeout.get(replicationLevel).inc();
+  }
+
+  public void addNumPendingWatchRequestsGauge(Supplier<Integer> queueSize, ReplicationLevel replication) {
+    getRegistry().gauge(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)), () -> queueSize);
+  }
+
+  public boolean removeNumPendingWatchRequestsGauge(ReplicationLevel replication) {
+    return getRegistry().remove(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)));
+  }
+
   public void onRequestByteSizeLimitHit() {
     numRequestsByteSizeLimitHits.inc();
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 1f19f9d..a9bdd1a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -31,8 +31,10 @@
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Watch;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.Slf4jUtils;
@@ -47,12 +49,14 @@
 import org.slf4j.event.Level;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.junit.Assert.fail;
@@ -469,6 +473,59 @@
     }
   }
 
+  @Test
+  public void testWatchMetrics() throws Exception {
+    final RaftProperties p = getProperties();
+    RaftServerConfigKeys.Watch.setElementLimit(p, 10);
+    RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(2, TimeUnit.SECONDS));
+    try {
+      runWithNewCluster(NUM_SERVERS,
+          cluster -> runSingleTest(WatchRequestTests::runTestWatchMetrics, cluster, LOG));
+    } finally {
+      RaftServerConfigKeys.Watch.setElementLimit(p, Watch.ELEMENT_LIMIT_DEFAULT);
+      RaftServerConfigKeys.Watch.setTimeout(p, RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+    }
+  }
+
+  static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division division) {
+    return (RaftServerMetricsImpl) division.getRaftServerMetrics();
+  }
+
+  static void runTestWatchMetrics(TestParameters p) throws Exception {
+    final MiniRaftCluster cluster = p.cluster;
+
+    List<RaftClient> clients = new ArrayList<>();
+
+    final ReplicationLevel replicationLevel = ReplicationLevel.MAJORITY;
+    try {
+      long initialWatchRequestTimeoutCount = getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestsTimeout(replicationLevel).getCount();
+      long initialLimitHit = getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestQueueLimitHits(replicationLevel).getCount();
+
+      int uncommittedBaseIndex = 10000;
+      // Logs with indices 10001 - 10011 will never be committed, so it should fail with NotReplicatedException
+      for (int i = 1; i <= 11; i++) {
+        RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+        clients.add(client);
+        client.async().watch(uncommittedBaseIndex + i, replicationLevel);
+      }
+
+      // All the watch timeout for each unique index should increment the metric
+      RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+              .getNumWatchRequestsTimeout(replicationLevel).getCount() == initialWatchRequestTimeoutCount + 10,
+          300, 5000);
+      // There are 11 pending watch request, but the pending watch request limit is 10
+      RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+          .getNumWatchRequestQueueLimitHits(replicationLevel).getCount() ==
+          initialLimitHit + 1, 300, 5000);
+    } finally {
+      for(RaftClient client : clients) {
+        client.close();
+      }
+    }
+  }
+
   static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
       List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
     for(int i = 0; i < replies.size(); i++) {