RATIS-1998. Add watch request metrics (#1009)
diff --git a/ratis-docs/src/site/markdown/metrics.md b/ratis-docs/src/site/markdown/metrics.md
index 10c78cc..2348679 100644
--- a/ratis-docs/src/site/markdown/metrics.md
+++ b/ratis-docs/src/site/markdown/metrics.md
@@ -80,31 +80,34 @@
### Raft Server Metrics
-| Application | Component | Name | Type | Description |
-|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
-| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge | Time elapsed since last heartbeat rpc response |
-| ratis | server | follower_append_entry_latency | Timer | Time taken for followers to append log entries |
-| ratis | server | {peer}_peerCommitIndex | Gauge | Commit index of peer |
-| ratis | server | clientReadRequest | Timer | Time taken to process read requests from client |
-| ratis | server | clientStaleReadRequest | Timer | Time taken to process stale-read requests from client |
-| ratis | server | clientWriteRequest | Timer | Time taken to process write requests from client |
-| ratis | server | clientWatch{level}Request | Timer | Time taken to process watch(replication_level) requests from client |
-| ratis | server | numRequestQueueLimitHits | Counter | Number of (total client requests in queue) limit hits |
-| ratis | server | numRequestsByteSizeLimitHits | Counter | Number of (total size of client requests in queue) limit hits |
-| ratis | server | numResourceLimitHits | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
-| ratis | server | numPendingRequestInQueue | Gauge | Number of pending client requests in queue |
-| ratis | server | numPendingRequestMegaByteSize | Gauge | Total size of pending client requests in queue |
-| ratis | server | retryCacheEntryCount | Gauge | Number of entries in retry cache |
-| ratis | server | retryCacheHitCount | Gauge | Number of retry cache hits |
-| ratis | server | retryCacheHitRate | Gauge | Retry cache hit rate |
-| ratis | server | retryCacheMissCount | Gauge | Number of retry cache misses |
-| ratis | server | retryCacheMissRate | Gauge | Retry cache miss rate |
-| ratis | server | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests |
-| ratis | server | numFailedClientReadOnServer | Counter | Number of failed read requests |
-| ratis | server | numFailedClientWriteOnServer | Counter | Number of failed write requests |
-| ratis | server | numFailedClientWatchOnServer | Counter | Number of failed watch requests |
-| ratis | server | numFailedClientStreamOnServer | Counter | Number of failed stream requests |
-| ratis | server | numInstallSnapshot | Counter | Number of install-snapshot requests |
+| Application | Component | Name | Type | Description |
+|-------------|-----------|--------------------------------------|---------|---------------------------------------------------------------------|
+| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge | Time elapsed since last heartbeat rpc response |
+| ratis | server | follower_append_entry_latency | Timer | Time taken for followers to append log entries |
+| ratis | server | {peer}_peerCommitIndex | Gauge | Commit index of peer |
+| ratis | server | clientReadRequest | Timer | Time taken to process read requests from client |
+| ratis | server | clientStaleReadRequest | Timer | Time taken to process stale-read requests from client |
+| ratis | server | clientWriteRequest | Timer | Time taken to process write requests from client |
+| ratis | server | clientWatch{level}Request | Timer | Time taken to process watch(replication_level) requests from client |
+| ratis | server | numRequestQueueLimitHits | Counter | Number of (total client requests in queue) limit hits |
+| ratis | server | numRequestsByteSizeLimitHits | Counter | Number of (total size of client requests in queue) limit hits |
+| ratis | server | numResourceLimitHits | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
+| ratis | server | numPendingRequestInQueue | Gauge | Number of pending client requests in queue |
+| ratis | server | numPendingRequestMegaByteSize | Gauge | Total size of pending client requests in queue |
+| ratis | server | retryCacheEntryCount | Gauge | Number of entries in retry cache |
+| ratis | server | retryCacheHitCount | Gauge | Number of retry cache hits |
+| ratis | server | retryCacheHitRate | Gauge | Retry cache hit rate |
+| ratis | server | retryCacheMissCount | Gauge | Number of retry cache misses |
+| ratis | server | retryCacheMissRate | Gauge | Retry cache miss rate |
+| ratis | server | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests |
+| ratis | server | numFailedClientReadOnServer | Counter | Number of failed read requests |
+| ratis | server | numFailedClientWriteOnServer | Counter | Number of failed write requests |
+| ratis | server | numFailedClientWatchOnServer | Counter | Number of failed watch requests |
+| ratis | server | numFailedClientStreamOnServer | Counter | Number of failed stream requests |
+| ratis | server | numInstallSnapshot | Counter | Number of install-snapshot requests |
+| ratis | server | numWatch{level}RequestTimeout | Counter | Number of watch(replication_level) request timeout |
+| ratis | server | numWatch{level}RequestInQueue | Gauge | Number of watch(replication_level) requests in queue |
+| ratis | server | numWatch{level}RequestQueueLimitHits | Counter | Number of (total watch request in queue) limit hits |
## Ratis Netty Metrics
diff --git a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
index 32bcf52..eafc384 100644
--- a/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
+++ b/ratis-metrics-api/src/main/java/org/apache/ratis/metrics/RatisMetrics.java
@@ -49,6 +49,13 @@
return Collections.unmodifiableMap(maps);
}
+ protected static <T extends Enum<T>> Map<T, LongCounter> newCounterMap(
+ Class<T> clazz, Function<T, LongCounter> constructor) {
+ final EnumMap<T, LongCounter> map = new EnumMap<>(clazz);
+ Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, constructor.apply(t)));
+ return Collections.unmodifiableMap(map);
+ }
+
protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
Class<T> clazz, Function<T, Timekeeper> constructor) {
final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8864c22..4175424 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -379,7 +379,7 @@
raftServerMetrics = server.getRaftServerMetrics();
logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
- this.watchRequests = new WatchRequests(server.getMemberId(), properties);
+ this.watchRequests = new WatchRequests(server.getMemberId(), properties, raftServerMetrics);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
this.readIndexHeartbeats = new ReadIndexHeartbeats();
@@ -457,6 +457,7 @@
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
+ watchRequests.close();
return f;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index f4c6200..6988bfb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -24,6 +24,7 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,11 +76,15 @@
private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
private final ResourceSemaphore resource;
+ private final RaftServerMetricsImpl raftServerMetrics;
private volatile long index; //Invariant: q.isEmpty() or index < any element q
- WatchQueue(ReplicationLevel replication, int elementLimit) {
+ WatchQueue(ReplicationLevel replication, int elementLimit, RaftServerMetricsImpl raftServerMetrics) {
this.replication = replication;
this.resource = new ResourceSemaphore(elementLimit);
+ this.raftServerMetrics = raftServerMetrics;
+
+ raftServerMetrics.addNumPendingWatchRequestsGauge(resource::used, replication);
}
long getIndex() {
@@ -103,6 +108,7 @@
if (computed == null) {
// failed to acquire
+ raftServerMetrics.onWatchRequestQueueLimitHit(replication);
return JavaUtils.completeExceptionally(new ResourceUnavailableException(
"Failed to acquire a pending watch request in " + name + " for " + request));
}
@@ -123,6 +129,7 @@
pending.getFuture().completeExceptionally(
new NotReplicatedException(request.getCallId(), replication, pending.getIndex()));
LOG.debug("{}: timeout {}, {}", name, pending, request);
+ raftServerMetrics.onWatchRequestTimeout(replication);
}
}
@@ -162,6 +169,12 @@
q.clear();
resource.close();
}
+
+ void close() {
+ if (raftServerMetrics != null) {
+ raftServerMetrics.removeNumPendingWatchRequestsGauge(replication);
+ }
+ }
}
private final String name;
@@ -171,7 +184,7 @@
private final TimeDuration watchTimeoutDenominationNanos;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- WatchRequests(Object name, RaftProperties properties) {
+ WatchRequests(Object name, RaftProperties properties, RaftServerMetricsImpl raftServerMetrics) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
final TimeDuration watchTimeout = RaftServerConfigKeys.Watch.timeout(properties);
@@ -183,7 +196,8 @@
+ watchTimeoutDenomination + ").");
final int elementLimit = RaftServerConfigKeys.Watch.elementLimit(properties);
- Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r, elementLimit)));
+ Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r,
+ new WatchQueue(r, elementLimit, raftServerMetrics)));
}
CompletableFuture<Long> add(RaftClientRequest request) {
@@ -207,4 +221,8 @@
void failWatches(Exception e) {
queues.values().forEach(q -> q.failAll(e));
}
+
+ void close() {
+ queues.values().forEach(WatchQueue::close);
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index cdbce6e..70711c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -57,9 +57,14 @@
public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numRequestQueueLimitHits";
public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
public static final String RESOURCE_LIMIT_HIT_COUNTER = "numResourceLimitHits";
+ public static final String WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numWatch%sRequestQueueLimitHits";
public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";
+
+ public static final String WATCH_REQUEST_QUEUE_SIZE = "numWatch%sRequestInQueue";
+ public static final String WATCH_REQUEST_TIMEOUT_COUNTER = "numWatch%sRequestTimeout";
+
public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -76,6 +81,11 @@
private final LongCounter numRequestQueueLimitHits = getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER);
private final LongCounter numRequestsByteSizeLimitHits = getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER);
private final LongCounter numResourceLimitHits = getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER);
+ private final Map<ReplicationLevel, LongCounter> numWatchRequestQueueLimitHits = newCounterMap(ReplicationLevel.class,
+ replication -> getRegistry().counter(
+ String.format(WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER, Type.toString(replication))));
+ private final Map<ReplicationLevel, LongCounter> numWatchRequestsTimeout = newCounterMap(ReplicationLevel.class,
+ replication -> getRegistry().counter(String.format(WATCH_REQUEST_TIMEOUT_COUNTER, Type.toString(replication))));
private final LongCounter numFailedClientStaleRead
= getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT);
@@ -150,6 +160,14 @@
return numInstallSnapshot;
}
+ public LongCounter getNumWatchRequestQueueLimitHits(ReplicationLevel replication) {
+ return numWatchRequestQueueLimitHits.get(replication);
+ }
+
+ public LongCounter getNumWatchRequestsTimeout(ReplicationLevel replication) {
+ return numWatchRequestsTimeout.get(replication);
+ }
+
private static RatisMetricRegistry createRegistry(String serverId) {
return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
@@ -237,6 +255,22 @@
return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE);
}
+ public void onWatchRequestQueueLimitHit(ReplicationLevel replicationLevel) {
+ numWatchRequestQueueLimitHits.get(replicationLevel).inc();
+ }
+
+ public void onWatchRequestTimeout(ReplicationLevel replicationLevel) {
+ numWatchRequestsTimeout.get(replicationLevel).inc();
+ }
+
+ public void addNumPendingWatchRequestsGauge(Supplier<Integer> queueSize, ReplicationLevel replication) {
+ getRegistry().gauge(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)), () -> queueSize);
+ }
+
+ public boolean removeNumPendingWatchRequestsGauge(ReplicationLevel replication) {
+ return getRegistry().remove(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)));
+ }
+
public void onRequestByteSizeLimitHit() {
numRequestsByteSizeLimitHits.inc();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 1f19f9d..a9bdd1a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -31,8 +31,10 @@
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Watch;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.Slf4jUtils;
@@ -47,12 +49,14 @@
import org.slf4j.event.Level;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.fail;
@@ -469,6 +473,59 @@
}
}
+ @Test
+ public void testWatchMetrics() throws Exception {
+ final RaftProperties p = getProperties();
+ RaftServerConfigKeys.Watch.setElementLimit(p, 10);
+ RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(2, TimeUnit.SECONDS));
+ try {
+ runWithNewCluster(NUM_SERVERS,
+ cluster -> runSingleTest(WatchRequestTests::runTestWatchMetrics, cluster, LOG));
+ } finally {
+ RaftServerConfigKeys.Watch.setElementLimit(p, Watch.ELEMENT_LIMIT_DEFAULT);
+ RaftServerConfigKeys.Watch.setTimeout(p, RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
+ }
+ }
+
+ static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division division) {
+ return (RaftServerMetricsImpl) division.getRaftServerMetrics();
+ }
+
+ static void runTestWatchMetrics(TestParameters p) throws Exception {
+ final MiniRaftCluster cluster = p.cluster;
+
+ List<RaftClient> clients = new ArrayList<>();
+
+ final ReplicationLevel replicationLevel = ReplicationLevel.MAJORITY;
+ try {
+ long initialWatchRequestTimeoutCount = getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestsTimeout(replicationLevel).getCount();
+ long initialLimitHit = getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestQueueLimitHits(replicationLevel).getCount();
+
+ int uncommittedBaseIndex = 10000;
+ // Logs with indices 10001 - 10011 will never be committed, so it should fail with NotReplicatedException
+ for (int i = 1; i <= 11; i++) {
+ RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+ clients.add(client);
+ client.async().watch(uncommittedBaseIndex + i, replicationLevel);
+ }
+
+ // All the watch timeout for each unique index should increment the metric
+ RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestsTimeout(replicationLevel).getCount() == initialWatchRequestTimeoutCount + 10,
+ 300, 5000);
+ // There are 11 pending watch request, but the pending watch request limit is 10
+ RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
+ .getNumWatchRequestQueueLimitHits(replicationLevel).getCount() ==
+ initialLimitHit + 1, 300, 5000);
+ } finally {
+ for(RaftClient client : clients) {
+ client.close();
+ }
+ }
+ }
+
static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
for(int i = 0; i < replies.size(); i++) {