RATIS-1978. Add tests assertions to verify all zero-copy messages are released properly (#1023)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
new file mode 100644
index 0000000..d801868
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
+ * observe resource object life-cycle and assert proper resource closure before they are GCed.
+ *
+ * <p>
+ * Example usage:
+ *
+ * <pre> {@code
+ * class MyResource implements AutoClosable {
+ *   static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
+ *
+ *   private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> {
+ *      // report leaks, don't refer to the original object (MyResource) here.
+ *      System.out.println("MyResource is not closed before being discarded.");
+ *   });
+ *
+ *   @Override
+ *   public void close() {
+ *     // proper resources cleanup...
+ *     // inform tracker that this object is closed properly.
+ *     leakTracker.close();
+ *   }
+ * }
+ *
+ * }</pre>
+ */
+public class LeakDetector {
+  private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
+  private static final AtomicLong COUNTER = new AtomicLong();
+
+  private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
+  private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final String name;
+
+  public LeakDetector(String name) {
+    this.name = name + COUNTER.getAndIncrement();
+  }
+
+  LeakDetector start() {
+    Thread t = new Thread(this::run);
+    t.setName(LeakDetector.class.getSimpleName() + "-" + name);
+    t.setDaemon(true);
+    LOG.info("Starting leak detector thread {}.", name);
+    t.start();
+    return this;
+  }
+
+  private void run() {
+    while (true) {
+      try {
+        LeakTracker tracker = (LeakTracker) queue.remove();
+        // Original resource already been GCed, if tracker is not closed yet,
+        // report a leak.
+        if (allLeaks.remove(tracker)) {
+          tracker.reportLeak();
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Thread interrupted, exiting.", e);
+        break;
+      }
+    }
+
+    LOG.warn("Exiting leak detector {}.", name);
+  }
+
+  public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
+    // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
+    // if we have proofs that leak tracking impacts performance, or a single LeakDetector
+    // thread can't keep up with the pace of object allocation.
+    // For now, it looks effective enough and let keep it simple.
+    LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
+    allLeaks.add(tracker);
+    return tracker;
+  }
+
+  public void assertNoLeaks() {
+    Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
+  }
+
+  String allLeaksString() {
+    if (allLeaks.isEmpty()) {
+      return "allLeaks = <empty>";
+    }
+    allLeaks.forEach(LeakTracker::reportLeak);
+    return "allLeaks.size = " + allLeaks.size();
+  }
+
+  private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
+    private final Set<LeakTracker> allLeaks;
+    private final Runnable leakReporter;
+    LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
+        Set<LeakTracker> allLeaks, Runnable leakReporter) {
+      super(referent, referenceQueue);
+      this.allLeaks = allLeaks;
+      this.leakReporter = leakReporter;
+    }
+
+    /**
+     * Called by the tracked resource when closing.
+     */
+    @Override
+    public void close() {
+      allLeaks.remove(this);
+    }
+
+    void reportLeak() {
+      leakReporter.run();
+    }
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
new file mode 100644
index 0000000..32abe80
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A utility to detect leaks from @{@link ReferenceCountedObject}.
+ */
+public final class ReferenceCountedLeakDetector {
+  private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedLeakDetector.class);
+  // Leak detection is turned off by default.
+
+  private static final AtomicReference<Mode> FACTORY = new AtomicReference<>(Mode.NONE);
+  private static final Supplier<LeakDetector> SUPPLIER
+      = MemoizedSupplier.valueOf(() -> new LeakDetector(FACTORY.get().name()).start());
+
+  static Factory getFactory() {
+    return FACTORY.get();
+  }
+
+  public static LeakDetector getLeakDetector() {
+    return SUPPLIER.get();
+  }
+
+  private ReferenceCountedLeakDetector() {
+  }
+
+  static synchronized void enable(boolean advanced) {
+    FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE);
+  }
+
+  interface Factory {
+    <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod);
+  }
+
+  private enum Mode implements Factory {
+    /** Leak detector is not enable in production to avoid performance impacts. */
+    NONE {
+      @Override
+      public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+        return new Impl<>(value, retainMethod, releaseMethod);
+      }
+    },
+    /** Leak detector is enabled to detect leaks. This is intended to use in every tests. */
+    SIMPLE {
+      @Override
+      public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+        return new SimpleTracing<>(value, retainMethod, releaseMethod, getLeakDetector());
+      }
+    },
+    /**
+     * Leak detector is enabled to detect leaks and report object creation stacktrace as well as every retain and
+     * release stacktraces. This has severe impact in performance and only used to debug specific test cases.
+     */
+    ADVANCED {
+      @Override
+      public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+        return new AdvancedTracing<>(value, retainMethod, releaseMethod, getLeakDetector());
+      }
+    }
+  }
+
+  private static class Impl<V> implements ReferenceCountedObject<V> {
+    private final AtomicInteger count;
+    private final V value;
+    private final Runnable retainMethod;
+    private final Consumer<Boolean> releaseMethod;
+
+    Impl(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
+      this.value = value;
+      this.retainMethod = retainMethod;
+      this.releaseMethod = releaseMethod;
+      count = new AtomicInteger();
+    }
+
+    @Override
+    public V get() {
+      final int previous = count.get();
+      if (previous < 0) {
+        throw new IllegalStateException("Failed to get: object has already been completely released.");
+      } else if (previous == 0) {
+        throw new IllegalStateException("Failed to get: object has not yet been retained.");
+      }
+      return value;
+    }
+
+    @Override
+    public V retain() {
+      // n <  0: exception
+      // n >= 0: n++
+      if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
+        throw new IllegalStateException("Failed to retain: object has already been completely released.");
+      }
+
+      retainMethod.run();
+      return value;
+    }
+
+    @Override
+    public boolean release() {
+      // n <= 0: exception
+      // n >  1: n--
+      // n == 1: n = -1
+      final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
+      if (previous < 0) {
+        throw new IllegalStateException("Failed to release: object has already been completely released.");
+      } else if (previous == 0) {
+        throw new IllegalStateException("Failed to release: object has not yet been retained.");
+      }
+      final boolean completedReleased = previous == 1;
+      releaseMethod.accept(completedReleased);
+      return completedReleased;
+    }
+  }
+
+  private static class SimpleTracing<T> extends Impl<T> {
+    private final UncheckedAutoCloseable leakTracker;
+
+    SimpleTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
+      super(value, retainMethod, releaseMethod);
+      final Class<?> clazz = value.getClass();
+      this.leakTracker = leakDetector.track(this,
+          () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName()));
+    }
+
+    @Override
+    public boolean release() {
+      boolean released = super.release();
+      if (released) {
+        leakTracker.close();
+      }
+      return released;
+    }
+  }
+
+  private static class AdvancedTracing<T> extends Impl<T> {
+    private final UncheckedAutoCloseable leakTracker;
+    private final List<StackTraceElement[]> retainsTraces;
+    private final List<StackTraceElement[]> releaseTraces;
+
+    AdvancedTracing(T value, Runnable retainMethod, Consumer<Boolean> releaseMethod, LeakDetector leakDetector) {
+      super(value, retainMethod, releaseMethod);
+
+      StackTraceElement[] createStrace = Thread.currentThread().getStackTrace();
+      final Class<?> clazz = value.getClass();
+      final List<StackTraceElement[]> localRetainsTraces = new LinkedList<>();
+      final List<StackTraceElement[]> localReleaseTraces = new LinkedList<>();
+
+      this.leakTracker = leakDetector.track(this, () ->
+          LOG.warn("LEAK: A {} is not released properly.\nCreation trace:\n{}\n" +
+              "Retain traces({}):\n{}\nRelease traces({}):\n{}",
+              clazz.getName(), formatStackTrace(createStrace, 3),
+              localRetainsTraces.size(), formatStackTraces(localRetainsTraces, 2),
+              localReleaseTraces.size(), formatStackTraces(localReleaseTraces, 2)));
+
+      this.retainsTraces = localRetainsTraces;
+      this.releaseTraces = localReleaseTraces;
+    }
+
+    @Override
+    public T retain() {
+      T retain = super.retain();
+      retainsTraces.add(Thread.currentThread().getStackTrace());
+      return retain;
+    }
+
+    @Override
+    public boolean release() {
+      boolean released = super.release();
+      if (released) {
+        leakTracker.close();
+      }
+      releaseTraces.add(Thread.currentThread().getStackTrace());
+      return released;
+    }
+  }
+
+  private static String formatStackTrace(StackTraceElement[] stackTrace, int startIdx) {
+    final StringBuilder sb = new StringBuilder();
+    for (int line = startIdx; line < stackTrace.length; line++) {
+      sb.append(stackTrace[line]).append("\n");
+    }
+    return sb.toString();
+  }
+
+  private static String formatStackTraces(List<StackTraceElement[]> stackTraces, int startIdx) {
+    final StringBuilder sb = new StringBuilder();
+    stackTraces.forEach(stackTrace -> {
+      if (sb.length() > 0) {
+        sb.append("\n");
+      }
+      for (int line = startIdx; line < stackTrace.length; line++) {
+        sb.append(stackTrace[line]).append("\n");
+      }
+    });
+    return sb.toString();
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 4cca3a9..eb5ff30 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -22,7 +22,6 @@
 import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 /**
@@ -45,6 +44,7 @@
  * @param <T> The object type.
  */
 public interface ReferenceCountedObject<T> {
+
   /** @return the object. */
   T get();
 
@@ -167,52 +167,20 @@
     Objects.requireNonNull(retainMethod, "retainMethod == null");
     Objects.requireNonNull(releaseMethod, "releaseMethod == null");
 
-    return new ReferenceCountedObject<V>() {
-      private final AtomicInteger count = new AtomicInteger();
-
-      @Override
-      public V get() {
-        final int previous = count.get();
-        if (previous < 0) {
-          throw new IllegalStateException("Failed to get: object has already been completely released.");
-        } else if (previous == 0) {
-          throw new IllegalStateException("Failed to get: object has not yet been retained.");
-        }
-        return value;
-      }
-
-      @Override
-      public V retain() {
-        // n <  0: exception
-        // n >= 0: n++
-        if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
-          throw new IllegalStateException("Failed to retain: object has already been completely released.");
-        }
-
-        retainMethod.run();
-        return value;
-      }
-
-      @Override
-      public boolean release() {
-        // n <= 0: exception
-        // n >  1: n--
-        // n == 1: n = -1
-        final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
-        if (previous < 0) {
-          throw new IllegalStateException("Failed to release: object has already been completely released.");
-        } else if (previous == 0) {
-          throw new IllegalStateException("Failed to release: object has not yet been retained.");
-        }
-        final boolean completedReleased = previous == 1;
-        releaseMethod.accept(completedReleased);
-        return completedReleased;
-      }
-    };
+    return ReferenceCountedLeakDetector.getFactory().create(value, retainMethod, releaseMethod);
   }
 
   /** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
   static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
     return wrap(value, retainMethod, ignored -> releaseMethod.run());
   }
+
+  static void enableLeakDetection() {
+    ReferenceCountedLeakDetector.enable(false);
+  }
+
+  static void enableAdvancedLeakDetection() {
+    ReferenceCountedLeakDetector.enable(true);
+  }
+
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
index 20da4ee..fec2135 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java
@@ -21,6 +21,7 @@
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
 
 public class ZeroCopyMetrics extends RatisMetrics {
@@ -55,4 +56,18 @@
     releasedMessages.inc();
   }
 
+  @VisibleForTesting
+  public long zeroCopyMessages() {
+    return zeroCopyMessages.getCount();
+  }
+
+  @VisibleForTesting
+  public long nonZeroCopyMessages() {
+    return nonZeroCopyMessages.getCount();
+  }
+
+  @VisibleForTesting
+  public long releasedMessages() {
+    return releasedMessages.getCount();
+  }
 }
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index d2dadcd..fa93586 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -30,6 +30,7 @@
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpcWithProxy;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
@@ -328,6 +329,7 @@
 
     serverInterceptor.close();
     ConcurrentUtils.shutdownAndWait(executor);
+    zeroCopyMetrics.unregister();
   }
 
   @Override
@@ -385,4 +387,8 @@
     return getProxies().getProxy(target).startLeaderElection(request);
   }
 
+  @VisibleForTesting
+  public ZeroCopyMetrics getZeroCopyMetrics() {
+    return zeroCopyMetrics;
+  }
 }
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 18c65c5..68556af 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -21,14 +21,19 @@
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
 import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.junit.Assert;
 
 import java.util.Optional;
 
@@ -45,6 +50,10 @@
     }
   };
 
+  static {
+    ReferenceCountedObject.enableLeakDetection();
+  }
+
   public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
     @Override
     default Factory<MiniRaftClusterWithGrpc> getFactory() {
@@ -55,7 +64,8 @@
   public static final DelayLocalExecutionInjection sendServerRequestInjection =
       new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
 
-  protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) {
+  protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties,
+      Parameters parameters) {
     super(ids, listenerIds, properties, parameters);
   }
 
@@ -75,4 +85,22 @@
     RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
         leaderId, delayMs, getTimeoutMax());
   }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    assertZeroCopyMetrics();
+  }
+
+  public void assertZeroCopyMetrics() {
+    getServers().forEach(server -> server.getGroupIds().forEach(id -> {
+      LOG.info("Checking {}-{}", server.getId(), id);
+      RaftServer.Division division = RaftServerTestUtil.getDivision(server, id);
+      GrpcService service = (GrpcService) RaftServerTestUtil.getServerRpc(division);
+      ZeroCopyMetrics zeroCopyMetrics = service.getZeroCopyMetrics();
+      Assert.assertEquals(0, zeroCopyMetrics.nonZeroCopyMessages());
+      Assert.assertEquals("Zero copy messages are not released, please check logs to find leaks. ",
+          zeroCopyMetrics.zeroCopyMessages(), zeroCopyMetrics.releasedMessages());
+    }));
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index ce08e4a..2f1492d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -168,6 +168,7 @@
         assertTrue(t.getTimer().getMeanRate() > 0.0d);
         assertTrue(t.getTimer().getCount() > 0L);
       }
+      cluster.shutdown();
     }
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 9b6d811..5fa0332 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -138,7 +138,7 @@
 
   @Test
   public void testGroupMismatchException() throws Exception {
-    runWithSameCluster(NUM_PEERS, this::runTestGroupMismatchException);
+    runWithNewCluster(NUM_PEERS, this::runTestGroupMismatchException);
   }
 
   void runTestGroupMismatchException(CLUSTER cluster) throws Exception {
@@ -171,7 +171,7 @@
 
   @Test
   public void testStaleReadException() throws Exception {
-    runWithSameCluster(NUM_PEERS, this::runTestStaleReadException);
+    runWithNewCluster(NUM_PEERS, this::runTestStaleReadException);
   }
 
   void runTestStaleReadException(CLUSTER cluster) throws Exception {
@@ -186,7 +186,7 @@
 
   @Test
   public void testLogAppenderBufferCapacity() throws Exception {
-    runWithSameCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity);
+    runWithNewCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity);
   }
 
   void runTestLogAppenderBufferCapacity(CLUSTER cluster) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 8cd15aa..024a60d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -50,6 +50,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -565,4 +566,18 @@
     Assert.assertNotNull("reply == null", reply);
     Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
   }
+
+  static void gc() throws InterruptedException {
+    // use WeakReference to detect gc
+    Object obj = new Object();
+    final WeakReference<Object> weakRef = new WeakReference<>(obj);
+    obj = null;
+
+    // loop until gc has completed.
+    for (int i = 0; weakRef.get() != null; i++) {
+      LOG.info("gc {}", i);
+      System.gc();
+      Thread.sleep(100);
+    }
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 6453e8e..6def81c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -353,6 +353,7 @@
         10, ONE_SECOND, "getLeaderId", LOG);
     LOG.info(cluster.printServers());
     Assert.assertEquals(leader.getId(), lastServerLeaderId);
+    cluster.shutdown();
   }
 
   protected void testDisconnectLeader() throws Exception {
@@ -523,6 +524,7 @@
     Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s, metric) ->
         s.contains(LAST_LEADER_ELECTION_ELAPSED_TIME)).values().iterator().next().getValue();
     assertTrue(leaderElectionLatency > 0L);
+    cluster.shutdown();
   }
 
   @Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f6dd612..933a995 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -50,6 +50,7 @@
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedLeakDetector;
 import org.apache.ratis.util.ReflectionUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedConsumer;
@@ -854,6 +855,14 @@
     Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
     ExitUtils.assertNotTerminated();
     LOG.info("{} shutdown completed", JavaUtils.getClassSimpleName(getClass()));
+
+    // GC to ensure leak detection work.
+    try {
+      RaftTestUtil.gc();
+    } catch (InterruptedException e) {
+      LOG.info("gc interrupted.");
+    }
+    ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks();
   }
 
   /**