RATIS-1978. Add tests assertions to verify all zero-copy messages are released properly (#1023)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
new file mode 100644
index 0000000..d801868
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
+ * observe resource object life-cycle and assert proper resource closure before they are GCed.
+ *
+ * <p>
+ * Example usage:
+ *
+ * <pre> {@code
+ * class MyResource implements AutoClosable {
+ * static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
+ *
+ * private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> {
+ * // report leaks, don't refer to the original object (MyResource) here.
+ * System.out.println("MyResource is not closed before being discarded.");
+ * });
+ *
+ * @Override
+ * public void close() {
+ * // proper resources cleanup...
+ * // inform tracker that this object is closed properly.
+ * leakTracker.close();
+ * }
+ * }
+ *
+ * }</pre>
+ */
+public class LeakDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
+ private static final AtomicLong COUNTER = new AtomicLong();
+
+ private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
+ private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final String name;
+
+ public LeakDetector(String name) {
+ this.name = name + COUNTER.getAndIncrement();
+ }
+
+ LeakDetector start() {
+ Thread t = new Thread(this::run);
+ t.setName(LeakDetector.class.getSimpleName() + "-" + name);
+ t.setDaemon(true);
+ LOG.info("Starting leak detector thread {}.", name);
+ t.start();
+ return this;
+ }
+
+ private void run() {
+ while (true) {
+ try {
+ LeakTracker tracker = (LeakTracker) queue.remove();
+ // Original resource already been GCed, if tracker is not closed yet,
+ // report a leak.
+ if (allLeaks.remove(tracker)) {
+ tracker.reportLeak();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted, exiting.", e);
+ break;
+ }
+ }
+
+ LOG.warn("Exiting leak detector {}.", name);
+ }
+
+ public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
+ // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
+ // if we have proofs that leak tracking impacts performance, or a single LeakDetector
+ // thread can't keep up with the pace of object allocation.
+ // For now, it looks effective enough and let keep it simple.
+ LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
+ allLeaks.add(tracker);
+ return tracker;
+ }
+
+ public void assertNoLeaks() {
+ Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
+ }
+
+ String allLeaksString() {
+ if (allLeaks.isEmpty()) {
+ return "allLeaks = <empty>";
+ }
+ allLeaks.forEach(LeakTracker::reportLeak);
+ return "allLeaks.size = " + allLeaks.size();
+ }
+
+ private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
+ private final Set<LeakTracker> allLeaks;
+ private final Runnable leakReporter;
+ LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
+ Set<LeakTracker> allLeaks, Runnable leakReporter) {
+ super(referent, referenceQueue);
+ this.allLeaks = allLeaks;
+ this.leakReporter = leakReporter;
+ }
+
+ /**
+ * Called by the tracked resource when closing.
+ */
+ @Override
+ public void close() {
+ allLeaks.remove(this);
+ }
+
+ void reportLeak() {
+ leakReporter.run();
+ }
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
new file mode 100644
index 0000000..32abe80
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A utility to detect leaks from @{@link ReferenceCountedObject}.
+ */
+public final class ReferenceCountedLeakDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedLeakDetector.class);
+ // Leak detection is turned off by default.
+
+ private static final AtomicReference<Mode> FACTORY = new AtomicReference<>(Mode.NONE);
+ private static final Supplier<LeakDetector> SUPPLIER
+ = MemoizedSupplier.valueOf(() -> new LeakDetector(FACTORY.get().name()).start());
+
+ static Factory getFactory() {
+ return FACTORY.get();
+ }
+
+ public static LeakDetector getLeakDetector() {
+ return SUPPLIER.get();
+ }
+
+ private ReferenceCountedLeakDetector() {
+ }
+
+ static synchronized void enable(boolean advanced) {
+ FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE);
+ }
+
+ interface Factory {
+ <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod);
+ }
+
+ private enum Mode implements Factory {
+ /** Leak detector is not enable in production to avoid performance impacts. */
+ NONE {
+ @Override
+ public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+ return new Impl<>(value, retainMethod, releaseMethod);
+ }
+ },
+ /** Leak detector is enabled to detect leaks. This is intended to use in every tests. */
+ SIMPLE {
+ @Override
+ public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+ return new SimpleTracing<>(value, retainMethod, releaseMethod, getLeakDetector());
+ }
+ },
+ /**
+ * Leak detector is enabled to detect leaks and report object creation stacktrace as well as every retain and
+ * release stacktraces. This has severe impact in performance and only used to debug specific test cases.
+ */
+ ADVANCED {
+ @Override
+ public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+ return new AdvancedTracing<>(value, retainMethod, releaseMethod, getLeakDetector());
+ }
+ }
+ }
+
+ private static class Impl<V> implements ReferenceCountedObject<V> {
+ private final AtomicInteger count;
+ private final V value;
+ private final Runnable retainMethod;
+ private final Consumer<Boolean> releaseMethod;
+
+ Impl(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+ this.value = value;
+ this.retainMethod = retainMethod;
+ this.releaseMethod = releaseMethod;
+ count = new AtomicInteger();
+ }
+
+ @Override
+ public V get() {
+ final int previous = count.get();
+ if (previous < 0) {
+ throw new IllegalStateException("Failed to get: object has already been completely released.");
+ } else if (previous == 0) {
+ throw new IllegalStateException("Failed to get: object has not yet been retained.");
+ }
+ return value;
+ }
+
+ @Override
+ public V retain() {
+ // n < 0: exception
+ // n >= 0: n++
+ if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
+ throw new IllegalStateException("Failed to retain: object has already been completely released.");
+ }
+
+ retainMethod.run();
+ return value;
+ }
+
+ @Override
+ public boolean release() {
+ // n <= 0: exception
+ // n > 1: n--
+ // n == 1: n = -1
+ final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
+ if (previous < 0) {
+ throw new IllegalStateException("Failed to release: object has already been completely released.");
+ } else if (previous == 0) {
+ throw new IllegalStateException("Failed to release: object has not yet been retained.");
+ }
+ final boolean completedReleased = previous == 1;
+ releaseMethod.accept(completedReleased);
+ return completedReleased;
+ }
+ }
+
+ private static class SimpleTracing<T> extends Impl<T> {
+ private final UncheckedAutoCloseable leakTracker;
+
+ SimpleTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
+ super(value, retainMethod, releaseMethod);
+ final Class<?> clazz = value.getClass();
+ this.leakTracker = leakDetector.track(this,
+ () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName()));
+ }
+
+ @Override
+ public boolean release() {
+ boolean released = super.release();
+ if (released) {
+ leakTracker.close();
+ }
+ return released;
+ }
+ }
+
+ private static class AdvancedTracing<T> extends Impl<T> {
+ private final UncheckedAutoCloseable leakTracker;
+ private final List<StackTraceElement[]> retainsTraces;
+ private final List<StackTraceElement[]> releaseTraces;
+
+ AdvancedTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
+ super(value, retainMethod, releaseMethod);
+
+ StackTraceElement[] createStrace = Thread.currentThread().getStackTrace();
+ final Class<?> clazz = value.getClass();
+ final List<StackTraceElement[]> localRetainsTraces = new LinkedList<>();
+ final List<StackTraceElement[]> localReleaseTraces = new LinkedList<>();
+
+ this.leakTracker = leakDetector.track(this, () ->
+ LOG.warn("LEAK: A {} is not released properly.\nCreation trace:\n{}\n" +
+ "Retain traces({}):\n{}\nRelease traces({}):\n{}",
+ clazz.getName(), formatStackTrace(createStrace, 3),
+ localRetainsTraces.size(), formatStackTraces(localRetainsTraces, 2),
+ localReleaseTraces.size(), formatStackTraces(localReleaseTraces, 2)));
+
+ this.retainsTraces = localRetainsTraces;
+ this.releaseTraces = localReleaseTraces;
+ }
+
+ @Override
+ public T retain() {
+ T retain = super.retain();
+ retainsTraces.add(Thread.currentThread().getStackTrace());
+ return retain;
+ }
+
+ @Override
+ public boolean release() {
+ boolean released = super.release();
+ if (released) {
+ leakTracker.close();
+ }
+ releaseTraces.add(Thread.currentThread().getStackTrace());
+ return released;
+ }
+ }
+
+ private static String formatStackTrace(StackTraceElement[] stackTrace, int startIdx) {
+ final StringBuilder sb = new StringBuilder();
+ for (int line = startIdx; line < stackTrace.length; line++) {
+ sb.append(stackTrace[line]).append("\n");
+ }
+ return sb.toString();
+ }
+
+ private static String formatStackTraces(List<StackTraceElement[]> stackTraces, int startIdx) {
+ final StringBuilder sb = new StringBuilder();
+ stackTraces.forEach(stackTrace -> {
+ if (sb.length() > 0) {
+ sb.append("\n");
+ }
+ for (int line = startIdx; line < stackTrace.length; line++) {
+ sb.append(stackTrace[line]).append("\n");
+ }
+ });
+ return sb.toString();
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 4cca3a9..eb5ff30 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
@@ -45,6 +44,7 @@
* @param <T> The object type.
*/
public interface ReferenceCountedObject<T> {
+
/** @return the object. */
T get();
@@ -167,52 +167,20 @@
Objects.requireNonNull(retainMethod, "retainMethod == null");
Objects.requireNonNull(releaseMethod, "releaseMethod == null");
- return new ReferenceCountedObject<V>() {
- private final AtomicInteger count = new AtomicInteger();
-
- @Override
- public V get() {
- final int previous = count.get();
- if (previous < 0) {
- throw new IllegalStateException("Failed to get: object has already been completely released.");
- } else if (previous == 0) {
- throw new IllegalStateException("Failed to get: object has not yet been retained.");
- }
- return value;
- }
-
- @Override
- public V retain() {
- // n < 0: exception
- // n >= 0: n++
- if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
- throw new IllegalStateException("Failed to retain: object has already been completely released.");
- }
-
- retainMethod.run();
- return value;
- }
-
- @Override
- public boolean release() {
- // n <= 0: exception
- // n > 1: n--
- // n == 1: n = -1
- final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
- if (previous < 0) {
- throw new IllegalStateException("Failed to release: object has already been completely released.");
- } else if (previous == 0) {
- throw new IllegalStateException("Failed to release: object has not yet been retained.");
- }
- final boolean completedReleased = previous == 1;
- releaseMethod.accept(completedReleased);
- return completedReleased;
- }
- };
+ return ReferenceCountedLeakDetector.getFactory().create(value, retainMethod, releaseMethod);
}
/** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
}
+
+ static void enableLeakDetection() {
+ ReferenceCountedLeakDetector.enable(false);
+ }
+
+ static void enableAdvancedLeakDetection() {
+ ReferenceCountedLeakDetector.enable(true);
+ }
+
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
index 20da4ee..fec2135 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
@@ -21,6 +21,7 @@
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
public class ZeroCopyMetrics extends RatisMetrics {
@@ -55,4 +56,18 @@
releasedMessages.inc();
}
+ @VisibleForTesting
+ public long zeroCopyMessages() {
+ return zeroCopyMessages.getCount();
+ }
+
+ @VisibleForTesting
+ public long nonZeroCopyMessages() {
+ return nonZeroCopyMessages.getCount();
+ }
+
+ @VisibleForTesting
+ public long releasedMessages() {
+ return releasedMessages.getCount();
+ }
}
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index d2dadcd..fa93586 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -30,6 +30,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
@@ -328,6 +329,7 @@
serverInterceptor.close();
ConcurrentUtils.shutdownAndWait(executor);
+ zeroCopyMetrics.unregister();
}
@Override
@@ -385,4 +387,8 @@
return getProxies().getProxy(target).startLeaderElection(request);
}
+ @VisibleForTesting
+ public ZeroCopyMetrics getZeroCopyMetrics() {
+ return zeroCopyMetrics;
+ }
}
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 18c65c5..68556af 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -21,14 +21,19 @@
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.junit.Assert;
import java.util.Optional;
@@ -45,6 +50,10 @@
}
};
+ static {
+ ReferenceCountedObject.enableLeakDetection();
+ }
+
public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
@Override
default Factory<MiniRaftClusterWithGrpc> getFactory() {
@@ -55,7 +64,8 @@
public static final DelayLocalExecutionInjection sendServerRequestInjection =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
- protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) {
+ protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties,
+ Parameters parameters) {
super(ids, listenerIds, properties, parameters);
}
@@ -75,4 +85,22 @@
RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
leaderId, delayMs, getTimeoutMax());
}
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ assertZeroCopyMetrics();
+ }
+
+ public void assertZeroCopyMetrics() {
+ getServers().forEach(server -> server.getGroupIds().forEach(id -> {
+ LOG.info("Checking {}-{}", server.getId(), id);
+ RaftServer.Division division = RaftServerTestUtil.getDivision(server, id);
+ GrpcService service = (GrpcService) RaftServerTestUtil.getServerRpc(division);
+ ZeroCopyMetrics zeroCopyMetrics = service.getZeroCopyMetrics();
+ Assert.assertEquals(0, zeroCopyMetrics.nonZeroCopyMessages());
+ Assert.assertEquals("Zero copy messages are not released, please check logs to find leaks. ",
+ zeroCopyMetrics.zeroCopyMessages(), zeroCopyMetrics.releasedMessages());
+ }));
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index ce08e4a..2f1492d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -168,6 +168,7 @@
assertTrue(t.getTimer().getMeanRate() > 0.0d);
assertTrue(t.getTimer().getCount() > 0L);
}
+ cluster.shutdown();
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 9b6d811..5fa0332 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -138,7 +138,7 @@
@Test
public void testGroupMismatchException() throws Exception {
- runWithSameCluster(NUM_PEERS, this::runTestGroupMismatchException);
+ runWithNewCluster(NUM_PEERS, this::runTestGroupMismatchException);
}
void runTestGroupMismatchException(CLUSTER cluster) throws Exception {
@@ -171,7 +171,7 @@
@Test
public void testStaleReadException() throws Exception {
- runWithSameCluster(NUM_PEERS, this::runTestStaleReadException);
+ runWithNewCluster(NUM_PEERS, this::runTestStaleReadException);
}
void runTestStaleReadException(CLUSTER cluster) throws Exception {
@@ -186,7 +186,7 @@
@Test
public void testLogAppenderBufferCapacity() throws Exception {
- runWithSameCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity);
+ runWithNewCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity);
}
void runTestLogAppenderBufferCapacity(CLUSTER cluster) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 8cd15aa..024a60d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -50,6 +50,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -565,4 +566,18 @@
Assert.assertNotNull("reply == null", reply);
Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
}
+
+ static void gc() throws InterruptedException {
+ // use WeakReference to detect gc
+ Object obj = new Object();
+ final WeakReference<Object> weakRef = new WeakReference<>(obj);
+ obj = null;
+
+ // loop until gc has completed.
+ for (int i = 0; weakRef.get() != null; i++) {
+ LOG.info("gc {}", i);
+ System.gc();
+ Thread.sleep(100);
+ }
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 6453e8e..6def81c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -353,6 +353,7 @@
10, ONE_SECOND, "getLeaderId", LOG);
LOG.info(cluster.printServers());
Assert.assertEquals(leader.getId(), lastServerLeaderId);
+ cluster.shutdown();
}
protected void testDisconnectLeader() throws Exception {
@@ -523,6 +524,7 @@
Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(LAST_LEADER_ELECTION_ELAPSED_TIME)).values().iterator().next().getValue();
assertTrue(leaderElectionLatency > 0L);
+ cluster.shutdown();
}
@Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f6dd612..933a995 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -50,6 +50,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedLeakDetector;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
@@ -854,6 +855,14 @@
Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
ExitUtils.assertNotTerminated();
LOG.info("{} shutdown completed", JavaUtils.getClassSimpleName(getClass()));
+
+ // GC to ensure leak detection work.
+ try {
+ RaftTestUtil.gc();
+ } catch (InterruptedException e) {
+ LOG.info("gc interrupted.");
+ }
+ ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks();
}
/**