Fix WeakHashMap implementation in SharedRateLimiterFactory (#1965)
* Fix WeakHashMap implementation
* Rename `update`/`report` methods to `updateAll`/`reportAll` and
reference using method reference, and make them private, since they
don't need to be any more visible than that
* Reuse the implementation for operating on a copy of the
`activeLimiters` for both `updateAll` and `reportAll`
* Convert syncronized long to AtomicLong
* Make SharedRateLimiter immune to system clock changes
Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c24a3d2..c5c6890 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -18,10 +18,14 @@
*/
package org.apache.accumulo.core.util.ratelimit;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -38,7 +42,8 @@
private static final long UPDATE_RATE = 1000;
private static SharedRateLimiterFactory instance = null;
private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
- private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
+ private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
+ new WeakHashMap<>();
private SharedRateLimiterFactory() {}
@@ -48,15 +53,13 @@
instance = new SharedRateLimiterFactory();
ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
- svc.scheduleWithFixedDelay(
- Threads.createNamedRunnable("SharedRateLimiterFactory update polling", () -> {
- instance.update();
- }), UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
+ svc.scheduleWithFixedDelay(Threads
+ .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
+ UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
- svc.scheduleWithFixedDelay(
- Threads.createNamedRunnable("SharedRateLimiterFactory report polling", () -> {
- instance.report();
- }), REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
+ svc.scheduleWithFixedDelay(Threads
+ .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll),
+ REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
}
return instance;
@@ -86,57 +89,55 @@
*/
public RateLimiter create(String name, RateProvider rateProvider) {
synchronized (activeLimiters) {
- if (activeLimiters.containsKey(name)) {
- return activeLimiters.get(name);
- } else {
- long initialRate;
- initialRate = rateProvider.getDesiredRate();
- SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
- activeLimiters.put(name, limiter);
- return limiter;
+ var limiterRef = activeLimiters.get(name);
+ var limiter = limiterRef == null ? null : limiterRef.get();
+ if (limiter == null) {
+ limiter = new SharedRateLimiter(name, rateProvider, rateProvider.getDesiredRate());
+ activeLimiters.put(name, new WeakReference<>(limiter));
}
+ return limiter;
}
}
+ private void copyAndThen(String actionName, Consumer<SharedRateLimiter> action) {
+ Map<String,SharedRateLimiter> limitersCopy = new HashMap<>();
+ // synchronize only for copy
+ synchronized (activeLimiters) {
+ activeLimiters.forEach((name, limiterRef) -> {
+ var limiter = limiterRef.get();
+ if (limiter != null) {
+ limitersCopy.put(name, limiter);
+ }
+ });
+ }
+ limitersCopy.forEach((name, limiter) -> {
+ try {
+ action.accept(limiter);
+ } catch (RuntimeException e) {
+ log.error("Failed to {} limiter {}", actionName, name, e);
+ }
+ });
+ }
+
/**
* Walk through all of the currently active RateLimiters, having each update its current rate.
* This is called periodically so that we can dynamically update as configuration changes.
*/
- protected void update() {
- Map<String,SharedRateLimiter> limitersCopy;
- synchronized (activeLimiters) {
- limitersCopy = Map.copyOf(activeLimiters);
- }
- for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
- try {
- entry.getValue().update();
- } catch (Exception ex) {
- log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
- }
- }
+ private void updateAll() {
+ copyAndThen("update", SharedRateLimiter::update);
}
/**
* Walk through all of the currently active RateLimiters, having each report its activity to the
* debug log.
*/
- protected void report() {
- Map<String,SharedRateLimiter> limitersCopy;
- synchronized (activeLimiters) {
- limitersCopy = Map.copyOf(activeLimiters);
- }
- for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
- try {
- entry.getValue().report();
- } catch (Exception ex) {
- log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
- }
- }
+ private void reportAll() {
+ copyAndThen("report", SharedRateLimiter::report);
}
protected class SharedRateLimiter extends GuavaRateLimiter {
- private volatile long permitsAcquired = 0;
- private volatile long lastUpdate;
+ private AtomicLong permitsAcquired = new AtomicLong();
+ private AtomicLong lastUpdate = new AtomicLong();
private final RateProvider rateProvider;
private final String name;
@@ -145,13 +146,13 @@
super(initialRate);
this.name = name;
this.rateProvider = rateProvider;
- this.lastUpdate = System.currentTimeMillis();
+ this.lastUpdate.set(System.nanoTime());
}
@Override
public void acquire(long permits) {
super.acquire(permits);
- permitsAcquired += permits;
+ permitsAcquired.addAndGet(permits);
}
/** Poll the callback, updating the current rate if necessary. */
@@ -166,14 +167,14 @@
/** Report the current throughput and usage of this rate limiter to the debug log. */
public void report() {
if (log.isDebugEnabled()) {
- long duration = System.currentTimeMillis() - lastUpdate;
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastUpdate.get());
if (duration == 0) {
return;
}
- lastUpdate = System.currentTimeMillis();
+ lastUpdate.set(System.nanoTime());
- long sum = permitsAcquired;
- permitsAcquired = 0;
+ long sum = permitsAcquired.get();
+ permitsAcquired.set(0);
if (sum > 0) {
log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name,