RATIS-1386. Some MetricsRegistry instances never unregistered (#486)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index dc8def1..48084c9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -92,6 +92,7 @@
private final AtomicReference<AsyncStreamObservers> orderedStreamObservers = new AtomicReference<>();
private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
+ private final MetricClientInterceptor metricClientInterceptor;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
@@ -99,7 +100,7 @@
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
- final MetricClientInterceptor monitoringInterceptor = new MetricClientInterceptor(getName());
+ metricClientInterceptor = new MetricClientInterceptor(getName());
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -108,10 +109,10 @@
final boolean separateAdminChannel = !Objects.equals(clientAddress, adminAddress);
clientChannel = buildChannel(clientAddress, clientTlsConfig,
- flowControlWindow, maxMessageSize, monitoringInterceptor);
+ flowControlWindow, maxMessageSize);
adminChannel = separateAdminChannel
? buildChannel(adminAddress, adminTlsConfig,
- flowControlWindow, maxMessageSize, monitoringInterceptor)
+ flowControlWindow, maxMessageSize)
: clientChannel;
asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -122,8 +123,7 @@
}
private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
- SizeInBytes flowControlWindow, SizeInBytes maxMessageSize,
- MetricClientInterceptor monitoringInterceptor) {
+ SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(address);
@@ -155,7 +155,7 @@
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
- .intercept(monitoringInterceptor)
+ .intercept(metricClientInterceptor)
.build();
}
@@ -172,6 +172,7 @@
GrpcUtil.shutdownManagedChannel(adminChannel);
}
scheduler.close();
+ metricClientInterceptor.close();
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
index 7597687..85ccba1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/client/MetricClientInterceptor.java
@@ -21,13 +21,15 @@
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.thirdparty.io.grpc.*;
+import java.io.Closeable;
+
/**
* An implementation of a client interceptor.
* Intercepts the messages and increments metrics accordingly
* before sending them.
*/
-public class MetricClientInterceptor implements ClientInterceptor {
+public class MetricClientInterceptor implements ClientInterceptor, Closeable {
private final String identifier;
private final MessageMetrics metrics;
@@ -53,4 +55,9 @@
getMethodMetricPrefix(methodDescriptor)
);
}
+
+ @Override
+ public void close() {
+ metrics.unregister();
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
index 6ec5468..a11d5ca 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/intercept/server/MetricServerInterceptor.java
@@ -26,6 +26,7 @@
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
+import java.io.Closeable;
import java.util.function.Supplier;
/**
@@ -34,7 +35,7 @@
* before handling them.
*/
-public class MetricServerInterceptor implements ServerInterceptor {
+public class MetricServerInterceptor implements ServerInterceptor, Closeable {
private String identifier;
private MessageMetrics metrics;
private final Supplier<RaftPeerId> peerIdSupplier;
@@ -77,4 +78,12 @@
return new MetricServerCallListener<>(
next.startCall(monitoringCall, requestHeaders), metricNamePrefix, metrics);
}
+
+ @Override
+ public void close() {
+ final MessageMetrics m = metrics;
+ if (m != null) {
+ m.unregister();
+ }
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index a618678..0fbdbfc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -270,6 +270,8 @@
}
LOG.info("{} successfully", name);
}
+
+ serverInterceptor.close();
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 6f80cdd..41caef4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -28,6 +28,7 @@
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -87,6 +88,12 @@
}
@Override
+ public void close() throws IOException {
+ super.close();
+ metrics.unregister();
+ }
+
+ @Override
public RaftLogMetricsBase getRaftLogMetrics() {
return metrics;
}