Fix WeakHashMap implementation in SharedRateLimiterFactory (#1965)

* Fix WeakHashMap implementation
* Rename `update`/`report` methods to `updateAll`/`reportAll` and
  reference using method reference, and make them private, since they
  don't need to be any more visible than that
* Reuse the implementation for operating on a copy of the
  `activeLimiters` for both `updateAll` and `reportAll`
* Convert syncronized long to AtomicLong
* Make SharedRateLimiter immune to system clock changes

Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c24a3d2..c5c6890 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -18,10 +18,14 @@
  */
 package org.apache.accumulo.core.util.ratelimit;
 
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -38,7 +42,8 @@
   private static final long UPDATE_RATE = 1000;
   private static SharedRateLimiterFactory instance = null;
   private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
-  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
+  private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
+      new WeakHashMap<>();
 
   private SharedRateLimiterFactory() {}
 
@@ -48,15 +53,13 @@
       instance = new SharedRateLimiterFactory();
 
       ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
-      svc.scheduleWithFixedDelay(
-          Threads.createNamedRunnable("SharedRateLimiterFactory update polling", () -> {
-            instance.update();
-          }), UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
+      svc.scheduleWithFixedDelay(Threads
+          .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
+          UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
 
-      svc.scheduleWithFixedDelay(
-          Threads.createNamedRunnable("SharedRateLimiterFactory report polling", () -> {
-            instance.report();
-          }), REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
+      svc.scheduleWithFixedDelay(Threads
+          .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll),
+          REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
 
     }
     return instance;
@@ -86,57 +89,55 @@
    */
   public RateLimiter create(String name, RateProvider rateProvider) {
     synchronized (activeLimiters) {
-      if (activeLimiters.containsKey(name)) {
-        return activeLimiters.get(name);
-      } else {
-        long initialRate;
-        initialRate = rateProvider.getDesiredRate();
-        SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
-        activeLimiters.put(name, limiter);
-        return limiter;
+      var limiterRef = activeLimiters.get(name);
+      var limiter = limiterRef == null ? null : limiterRef.get();
+      if (limiter == null) {
+        limiter = new SharedRateLimiter(name, rateProvider, rateProvider.getDesiredRate());
+        activeLimiters.put(name, new WeakReference<>(limiter));
       }
+      return limiter;
     }
   }
 
+  private void copyAndThen(String actionName, Consumer<SharedRateLimiter> action) {
+    Map<String,SharedRateLimiter> limitersCopy = new HashMap<>();
+    // synchronize only for copy
+    synchronized (activeLimiters) {
+      activeLimiters.forEach((name, limiterRef) -> {
+        var limiter = limiterRef.get();
+        if (limiter != null) {
+          limitersCopy.put(name, limiter);
+        }
+      });
+    }
+    limitersCopy.forEach((name, limiter) -> {
+      try {
+        action.accept(limiter);
+      } catch (RuntimeException e) {
+        log.error("Failed to {} limiter {}", actionName, name, e);
+      }
+    });
+  }
+
   /**
    * Walk through all of the currently active RateLimiters, having each update its current rate.
    * This is called periodically so that we can dynamically update as configuration changes.
    */
-  protected void update() {
-    Map<String,SharedRateLimiter> limitersCopy;
-    synchronized (activeLimiters) {
-      limitersCopy = Map.copyOf(activeLimiters);
-    }
-    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
-      try {
-        entry.getValue().update();
-      } catch (Exception ex) {
-        log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
-      }
-    }
+  private void updateAll() {
+    copyAndThen("update", SharedRateLimiter::update);
   }
 
   /**
    * Walk through all of the currently active RateLimiters, having each report its activity to the
    * debug log.
    */
-  protected void report() {
-    Map<String,SharedRateLimiter> limitersCopy;
-    synchronized (activeLimiters) {
-      limitersCopy = Map.copyOf(activeLimiters);
-    }
-    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
-      try {
-        entry.getValue().report();
-      } catch (Exception ex) {
-        log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
-      }
-    }
+  private void reportAll() {
+    copyAndThen("report", SharedRateLimiter::report);
   }
 
   protected class SharedRateLimiter extends GuavaRateLimiter {
-    private volatile long permitsAcquired = 0;
-    private volatile long lastUpdate;
+    private AtomicLong permitsAcquired = new AtomicLong();
+    private AtomicLong lastUpdate = new AtomicLong();
 
     private final RateProvider rateProvider;
     private final String name;
@@ -145,13 +146,13 @@
       super(initialRate);
       this.name = name;
       this.rateProvider = rateProvider;
-      this.lastUpdate = System.currentTimeMillis();
+      this.lastUpdate.set(System.nanoTime());
     }
 
     @Override
     public void acquire(long permits) {
       super.acquire(permits);
-      permitsAcquired += permits;
+      permitsAcquired.addAndGet(permits);
     }
 
     /** Poll the callback, updating the current rate if necessary. */
@@ -166,14 +167,14 @@
     /** Report the current throughput and usage of this rate limiter to the debug log. */
     public void report() {
       if (log.isDebugEnabled()) {
-        long duration = System.currentTimeMillis() - lastUpdate;
+        long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastUpdate.get());
         if (duration == 0) {
           return;
         }
-        lastUpdate = System.currentTimeMillis();
+        lastUpdate.set(System.nanoTime());
 
-        long sum = permitsAcquired;
-        permitsAcquired = 0;
+        long sum = permitsAcquired.get();
+        permitsAcquired.set(0);
 
         if (sum > 0) {
           log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name,