HBASE-29351 Quotas: adaptive wait intervals (#7396)
Co-authored-by: Ray Mattingly <rmattingly@hubspot.com>
Signed-off-by: Charles Connell <cconnell@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java
new file mode 100644
index 0000000..6acfd07
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble;
+
+/**
+ * An adaptive rate limiter that dynamically adjusts its behavior based on observed usage patterns
+ * to achieve stable, full utilization of configured quota allowances while managing client
+ * contention.
+ * <p>
+ * <b>Core Algorithm:</b> This rate limiter divides time into fixed refill intervals (configurable
+ * via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 refill per TimeUnit of the
+ * RateLimiter). At the beginning of each interval, a fresh allocation of resources becomes
+ * available based on the configured limit. Clients consume resources as they make requests. When
+ * resources are exhausted, clients must wait until the next refill, or until enough resources
+ * become available.
+ * <p>
+ * <b>Adaptive Backpressure:</b> When multiple threads compete for limited resources (contention),
+ * this limiter detects the contention and applies increasing backpressure by extending wait
+ * intervals. This prevents thundering herd behavior where many threads wake simultaneously and
+ * compete for the same resources. The backoff multiplier increases by a small increment (see
+ * {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when contention occurs, and
+ * decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) when no contention is
+ * detected, converging toward optimal throughput. The multiplier is capped at a maximum value (see
+ * {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded waits.
+ * <p>
+ * Contention is detected when {@link #getWaitInterval} is called with insufficient available
+ * resources (i.e., {@code amount > available}), indicating a thread needs to wait for resources. If
+ * this occurs more than once in a refill interval, the limiter identifies it as contention
+ * requiring increased backpressure.
+ * <p>
+ * <b>Oversubscription for Full Utilization:</b> In practice, synchronization overhead and timing
+ * variations often prevent clients from consuming exactly their full allowance, resulting in
+ * consistent under-utilization. This limiter addresses this by tracking utilization via an
+ * exponentially weighted moving average (EWMA). When average utilization falls below the target
+ * range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), the limiter gradually
+ * increases the oversubscription proportion (see
+ * {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more resources per interval than
+ * the base limit. Conversely, when utilization exceeds the target range, oversubscription is
+ * decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). Oversubscription is capped
+ * (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive bursts while still
+ * enabling consistent full utilization.
+ * <p>
+ * <b>Example Scenario:</b> Consider a quota of 1000 requests per second with a 1-second refill
+ * interval. Without oversubscription, clients might typically achieve only 950 req/s due to
+ * coordination delays. This limiter would detect the under-utilization, gradually increase
+ * oversubscription, allowing slightly more resources per interval, which compensates for
+ * inefficiencies and achieves stable throughput closer to the configured quota. If multiple threads
+ * simultaneously try to consume resources and repeatedly wait, the backoff multiplier increases
+ * their wait times, spreading out their retry attempts and reducing wasted CPU cycles.
+ * <p>
+ * <b>Configuration Parameters:</b>
+ * <ul>
+ * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}: Controls rate of backpressure
+ * increase</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}: Controls rate of backpressure
+ * decrease</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}: Caps the maximum wait time extension</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}: Controls rate of oversubscription
+ * increase</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}: Controls rate of oversubscription
+ * decrease</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}: Caps the maximum burst capacity</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}: Defines the acceptable range around full
+ * utilization</li>
+ * </ul>
+ * <p>
+ * This algorithm converges toward stable operation where: (1) wait intervals are just long enough
+ * to prevent excessive contention, and (2) oversubscription is just high enough to achieve
+ * consistent full utilization of the configured allowance.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FeedbackAdaptiveRateLimiter extends RateLimiter {
+
+ /**
+ * Amount to increase the backoff multiplier when contention is detected per refill interval. In
+ * other words, if we are throttling more than once per refill interval, then we will increase our
+ * wait intervals (increase backpressure, decrease throughput).
+ */
+ public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT =
+ "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment";
+ public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005;
+
+ /**
+ * Amount to decrease the backoff multiplier when no contention is detected per refill interval.
+ * In other words, if we are only throttling once per refill interval, then we will decrease our
+ * wait interval (decrease backpressure, increase throughput).
+ */
+ public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT =
+ "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement";
+ public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001;
+
+ /**
+ * Maximum ceiling for the backoff multiplier to avoid unbounded waits.
+ */
+ public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER =
+ "hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier";
+ public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0;
+
+ /**
+ * Amount to increase the oversubscription proportion when utilization is below (1.0-errorBudget).
+ */
+ public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT =
+ "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment";
+ public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001;
+
+ /**
+ * Amount to decrease the oversubscription proportion when utilization exceeds (1.0+errorBudget).
+ */
+ public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT =
+ "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement";
+ public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005;
+
+ /**
+ * Maximum ceiling for oversubscription to prevent unbounded bursts. Some oversubscription can be
+ * nice, because it allows you to balance the inefficiency and latency of retries, landing on
+ * stable usage at approximately your configured allowance. Without adequate oversubscription,
+ * your steady state may often seem significantly, and suspiciously, lower than your configured
+ * allowance.
+ */
+ public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION =
+ "hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription";
+ public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25;
+
+ /**
+ * Acceptable deviation around full utilization (1.0) for adjusting oversubscription. If stable
+ * throttle usage is typically under (1.0-errorBudget), then we will allow more oversubscription.
+ * If stable throttle usage is typically over (1.0+errorBudget), then we will pull back
+ * oversubscription.
+ */
+ public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET =
+ "hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget";
+ public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025;
+
+ private static final int WINDOW_TIME_MS = 60_000;
+
+ public static class FeedbackAdaptiveRateLimiterFactory {
+
+ private final long refillInterval;
+ private final double backoffMultiplierIncrement;
+ private final double backoffMultiplierDecrement;
+ private final double maxBackoffMultiplier;
+ private final double oversubscriptionIncrement;
+ private final double oversubscriptionDecrement;
+ private final double maxOversubscription;
+ private final double utilizationErrorBudget;
+
+ public FeedbackAdaptiveRateLimiterFactory(Configuration conf) {
+ refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
+ RateLimiter.DEFAULT_TIME_UNIT);
+
+ maxBackoffMultiplier =
+ conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, DEFAULT_MAX_BACKOFF_MULTIPLIER);
+
+ backoffMultiplierIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT,
+ DEFAULT_BACKOFF_MULTIPLIER_INCREMENT);
+ backoffMultiplierDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
+ DEFAULT_BACKOFF_MULTIPLIER_DECREMENT);
+
+ oversubscriptionIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT,
+ DEFAULT_OVERSUBSCRIPTION_INCREMENT);
+ oversubscriptionDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT,
+ DEFAULT_OVERSUBSCRIPTION_DECREMENT);
+
+ maxOversubscription =
+ conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, DEFAULT_MAX_OVERSUBSCRIPTION);
+ utilizationErrorBudget = conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET,
+ DEFAULT_UTILIZATION_ERROR_BUDGET);
+ }
+
+ public FeedbackAdaptiveRateLimiter create() {
+ return new FeedbackAdaptiveRateLimiter(refillInterval, backoffMultiplierIncrement,
+ backoffMultiplierDecrement, maxBackoffMultiplier, oversubscriptionIncrement,
+ oversubscriptionDecrement, maxOversubscription, utilizationErrorBudget);
+ }
+ }
+
+ private volatile long nextRefillTime = -1L;
+ private final long refillInterval;
+ private final double backoffMultiplierIncrement;
+ private final double backoffMultiplierDecrement;
+ private final double maxBackoffMultiplier;
+ private final double oversubscriptionIncrement;
+ private final double oversubscriptionDecrement;
+ private final double maxOversubscription;
+ private final double minTargetUtilization;
+ private final double maxTargetUtilization;
+
+ // Adaptive backoff state
+ private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0);
+ private volatile boolean hadContentionThisInterval = false;
+
+ // Over-subscription proportion state
+ private final AtomicDouble oversubscriptionProportion = new AtomicDouble(0.0);
+
+ // EWMA tracking
+ private final double emaAlpha;
+ private volatile double utilizationEma = 0.0;
+ private final AtomicLong lastIntervalConsumed;
+
+ FeedbackAdaptiveRateLimiter(long refillInterval, double backoffMultiplierIncrement,
+ double backoffMultiplierDecrement, double maxBackoffMultiplier,
+ double oversubscriptionIncrement, double oversubscriptionDecrement, double maxOversubscription,
+ double utilizationErrorBudget) {
+ super();
+ Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, String.format(
+ "Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, getTimeUnitInMillis()));
+
+ Preconditions.checkArgument(backoffMultiplierIncrement > 0.0,
+ String.format("Backoff multiplier increment %s must be > 0.0", backoffMultiplierIncrement));
+ Preconditions.checkArgument(backoffMultiplierDecrement > 0.0,
+ String.format("Backoff multiplier decrement %s must be > 0.0", backoffMultiplierDecrement));
+ Preconditions.checkArgument(maxBackoffMultiplier > 1.0,
+ String.format("Max backoff multiplier %s must be > 1.0", maxBackoffMultiplier));
+ Preconditions.checkArgument(utilizationErrorBudget > 0.0 && utilizationErrorBudget <= 1.0,
+ String.format("Utilization error budget %s must be between 0.0 and 1.0",
+ utilizationErrorBudget));
+
+ this.refillInterval = refillInterval;
+ this.backoffMultiplierIncrement = backoffMultiplierIncrement;
+ this.backoffMultiplierDecrement = backoffMultiplierDecrement;
+ this.maxBackoffMultiplier = maxBackoffMultiplier;
+ this.oversubscriptionIncrement = oversubscriptionIncrement;
+ this.oversubscriptionDecrement = oversubscriptionDecrement;
+ this.maxOversubscription = maxOversubscription;
+ this.minTargetUtilization = 1.0 - utilizationErrorBudget;
+ this.maxTargetUtilization = 1.0 + utilizationErrorBudget;
+
+ this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + refillInterval);
+ this.lastIntervalConsumed = new AtomicLong(0);
+ }
+
+ @Override
+ public long refill(long limit) {
+ final long now = EnvironmentEdgeManager.currentTime();
+ if (nextRefillTime == -1) {
+ nextRefillTime = now + refillInterval;
+ hadContentionThisInterval = false;
+ return getOversubscribedLimit(limit);
+ }
+ if (now < nextRefillTime) {
+ return 0;
+ }
+ long diff = refillInterval + now - nextRefillTime;
+ long refills = diff / refillInterval;
+ nextRefillTime = now + refillInterval;
+
+ long intendedUsage = getRefillIntervalAdjustedLimit(limit);
+ if (intendedUsage > 0) {
+ long consumed = lastIntervalConsumed.get();
+ if (consumed > 0) {
+ double util = (double) consumed / intendedUsage;
+ utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma;
+ }
+ }
+
+ if (hadContentionThisInterval) {
+ currentBackoffMultiplier.set(Math
+ .min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, maxBackoffMultiplier));
+ } else {
+ currentBackoffMultiplier
+ .set(Math.max(currentBackoffMultiplier.get() - backoffMultiplierDecrement, 1.0));
+ }
+
+ double avgUtil = utilizationEma;
+ if (avgUtil < minTargetUtilization) {
+ oversubscriptionProportion.set(Math
+ .min(oversubscriptionProportion.get() + oversubscriptionIncrement, maxOversubscription));
+ } else if (avgUtil >= maxTargetUtilization) {
+ oversubscriptionProportion
+ .set(Math.max(oversubscriptionProportion.get() - oversubscriptionDecrement, 0.0));
+ }
+
+ hadContentionThisInterval = false;
+ lastIntervalConsumed.set(0);
+
+ long refillAmount = refills * getRefillIntervalAdjustedLimit(limit);
+ long maxRefill = getOversubscribedLimit(limit);
+ return Math.min(maxRefill, refillAmount);
+ }
+
+ private long getOversubscribedLimit(long limit) {
+ return limit + (long) (limit * oversubscriptionProportion.get());
+ }
+
+ @Override
+ public void consume(long amount) {
+ super.consume(amount);
+ lastIntervalConsumed.addAndGet(amount);
+ }
+
+ @Override
+ public long getWaitInterval(long limit, long available, long amount) {
+ limit = getRefillIntervalAdjustedLimit(limit);
+ if (nextRefillTime == -1) {
+ return 0;
+ }
+
+ final long now = EnvironmentEdgeManager.currentTime();
+ final long refillTime = nextRefillTime;
+ long diff = amount - available;
+ if (diff > 0) {
+ hadContentionThisInterval = true;
+ }
+
+ long nextInterval = refillTime - now;
+ if (diff <= limit) {
+ return applyBackoffMultiplier(nextInterval);
+ }
+
+ long extra = diff / limit;
+ if (diff % limit == 0) {
+ extra--;
+ }
+ long baseWait = nextInterval + (extra * refillInterval);
+ return applyBackoffMultiplier(baseWait);
+ }
+
+ private long getRefillIntervalAdjustedLimit(long limit) {
+ return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit);
+ }
+
+ private long applyBackoffMultiplier(long baseWaitInterval) {
+ return (long) (baseWaitInterval * currentBackoffMultiplier.get());
+ }
+
+ // strictly for testing
+ @Override
+ public void setNextRefillTime(long nextRefillTime) {
+ this.nextRefillTime = nextRefillTime;
+ }
+
+ @Override
+ public long getNextRefillTime() {
+ return this.nextRefillTime;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 43dfab7..38d171f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -46,11 +46,10 @@
private RateLimiter reqHandlerUsageTimeLimiter = null;
private TimeBasedLimiter(Configuration conf) {
- if (
- FixedIntervalRateLimiter.class.getName().equals(
- conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
- .getName())
- ) {
+ String limiterClassName =
+ conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
+ .getName();
+ if (FixedIntervalRateLimiter.class.getName().equals(limiterClassName)) {
long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
RateLimiter.DEFAULT_TIME_UNIT);
reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
@@ -66,6 +65,22 @@
atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
reqHandlerUsageTimeLimiter = new FixedIntervalRateLimiter(refillInterval);
+ } else if (FeedbackAdaptiveRateLimiter.class.getName().equals(limiterClassName)) {
+ FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory feedbackLimiterFactory =
+ new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
+ reqsLimiter = feedbackLimiterFactory.create();
+ reqSizeLimiter = feedbackLimiterFactory.create();
+ writeReqsLimiter = feedbackLimiterFactory.create();
+ writeSizeLimiter = feedbackLimiterFactory.create();
+ readReqsLimiter = feedbackLimiterFactory.create();
+ readSizeLimiter = feedbackLimiterFactory.create();
+ reqCapacityUnitLimiter = feedbackLimiterFactory.create();
+ writeCapacityUnitLimiter = feedbackLimiterFactory.create();
+ readCapacityUnitLimiter = feedbackLimiterFactory.create();
+ atomicReqLimiter = feedbackLimiterFactory.create();
+ atomicReadSizeLimiter = feedbackLimiterFactory.create();
+ atomicWriteSizeLimiter = feedbackLimiterFactory.create();
+ reqHandlerUsageTimeLimiter = feedbackLimiterFactory.create();
} else {
reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java
new file mode 100644
index 0000000..a6a2332
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and
+ * over-subscription functionality.
+ */
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestFeedbackAdaptiveRateLimiter {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestFeedbackAdaptiveRateLimiter.class);
+
+ private ManualEnvironmentEdge testEdge;
+ private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory;
+
+ @Before
+ public void setUp() {
+ testEdge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(testEdge);
+
+ Configuration conf = HBaseConfiguration.create();
+ // Set refill interval for testing
+ conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500);
+ // Configure adaptive parameters for testing - using larger values than defaults for
+ // observability
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
+ 0.05);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2);
+ conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1);
+
+ factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
+ }
+
+ @After
+ public void tearDown() {
+ EnvironmentEdgeManager.reset();
+ }
+
+ @Test
+ public void testBasicFunctionality() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ // Initially should work like normal rate limiter
+ assertEquals(0, limiter.getWaitIntervalMs());
+ limiter.consume(5);
+ assertEquals(0, limiter.getWaitIntervalMs());
+ limiter.consume(5);
+
+ // Should need to wait after consuming full limit
+ assertTrue(limiter.getWaitIntervalMs() > 0);
+ }
+
+ @Test
+ public void testAdaptiveBackoffIncreases() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Record initial wait interval
+ limiter.consume(10);
+ long initialWaitInterval = limiter.getWaitInterval(10, 0, 1);
+ assertTrue("Initial wait interval should be positive", initialWaitInterval > 0);
+
+ // Create sustained contention over multiple intervals to increase backoff
+ for (int i = 0; i < 5; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ limiter.consume(10);
+ // Create contention by asking for more than available
+ limiter.getWaitInterval(10, 0, 1);
+ }
+
+ // After contention, wait interval should increase due to backoff multiplier
+ testEdge.setValue(4000);
+ limiter.refill(10);
+ limiter.consume(10);
+ long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // With backoffMultiplierIncrement=0.1 and 5 intervals of contention,
+ // multiplier should be around 1.5, so wait should be significantly higher
+ assertTrue(
+ "Wait interval should increase with contention. Initial: " + initialWaitInterval
+ + ", After contention: " + increasedWaitInterval,
+ increasedWaitInterval > initialWaitInterval * 1.3);
+ }
+
+ @Test
+ public void testAdaptiveBackoffDecreases() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Build up contention to increase backoff multiplier
+ for (int i = 0; i < 5; i++) {
+ limiter.consume(10);
+ limiter.getWaitInterval(10, 0, 1); // Create contention
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ }
+
+ // Measure wait interval with elevated backoff
+ limiter.consume(10);
+ long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // Run several intervals without contention to decrease backoff
+ for (int i = 0; i < 10; i++) {
+ testEdge.setValue(4000 + i * 500);
+ limiter.refill(10);
+ // Consume less than available - no contention
+ limiter.consume(3);
+ }
+
+ // Measure wait interval after backoff reduction
+ testEdge.setValue(9500);
+ limiter.refill(10);
+ limiter.consume(10);
+ long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // After 10 intervals without contention (decrement=0.05 each),
+ // multiplier should decrease by ~0.5, making wait interval lower
+ assertTrue("Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval
+ + ", Reduced: " + reducedWaitInterval, reducedWaitInterval < elevatedWaitInterval * 0.9);
+ }
+
+ @Test
+ public void testOversubscriptionIncreasesWithLowUtilization() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+
+ // Initial refill to set up the limiter
+ long initialRefill = limiter.refill(10);
+ assertEquals("Initial refill should match limit", 10, initialRefill);
+
+ // Create low utilization scenario (consuming much less than available)
+ // With error budget of 0.1, min target utilization is 0.9
+ // We'll consume only ~40% to trigger oversubscription increase
+ // Refill interval adjusted limit is 5 (500ms / 1000ms * 10)
+ for (int i = 0; i < 30; i++) {
+ // Consume before advancing time so utilization is tracked
+ limiter.consume(2); // 2 out of 5 = 40% utilization
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ }
+
+ // After many intervals of low utilization, oversubscription should have increased
+ // Now test that the oversubscription proportion actually affects refill behavior
+ // Consume all available to start fresh
+ limiter.consume((int) limiter.getAvailable());
+
+ // Jump forward by 3 refill intervals (1500ms)
+ // This tests that refill can return more than the base limit due to oversubscription
+ testEdge.setValue(16000 + 1500);
+ long multiIntervalRefill = limiter.refill(10);
+
+ // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12
+ // With 3 intervals: refillAmount = 3 * 5 = 15
+ // Result = min(12, 15) = 12, which exceeds the base limit of 10
+ // Without oversubscription, this would be capped at min(10, 15) = 10
+ assertTrue("With oversubscription from low utilization, refill should exceed base limit. Got: "
+ + multiIntervalRefill, multiIntervalRefill > 10);
+ }
+
+ @Test
+ public void testOversubscriptionDecreasesWithHighUtilization() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+
+ // First, build up oversubscription with low utilization
+ limiter.refill(10);
+ for (int i = 0; i < 15; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ limiter.consume(2); // Low utilization
+ }
+
+ // Now create high utilization scenario (consuming more than target)
+ // With error budget of 0.1, max target utilization is 1.1
+ // We'll consume close to the full interval-adjusted limit to trigger decrease
+ for (int i = 0; i < 10; i++) {
+ testEdge.setValue(8500 + (i + 1) * 500);
+ long refilled = limiter.refill(10);
+ // Consume full amount to show high utilization
+ limiter.consume((int) refilled);
+ }
+
+ // After intervals of high utilization, oversubscription should decrease
+ testEdge.setValue(14000);
+ long refillAfterHighUtil = limiter.refill(10);
+
+ // Oversubscription should have decreased, so refill should be closer to base limit
+ // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05
+ assertTrue(
+ "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil,
+ refillAfterHighUtil <= 6);
+ }
+
+ @Test
+ public void testBackoffMultiplierCapsAtMaximum() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Record base wait interval
+ limiter.consume(10);
+ long baseWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // Create extreme sustained contention to push backoff to max
+ // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals
+ for (int i = 0; i < 25; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ limiter.consume(10);
+ limiter.getWaitInterval(10, 0, 1); // Create contention
+ }
+
+ // Measure wait at maximum backoff
+ testEdge.setValue(14000);
+ limiter.refill(10);
+ limiter.consume(10);
+ long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // Wait interval should be approximately 3x base (max multiplier)
+ assertTrue(
+ "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: "
+ + maxBackoffWaitInterval,
+ maxBackoffWaitInterval >= baseWaitInterval * 2.5
+ && maxBackoffWaitInterval <= baseWaitInterval * 3.5);
+
+ // Additional contention should not increase wait further
+ testEdge.setValue(14500);
+ limiter.refill(10);
+ limiter.consume(10);
+ limiter.getWaitInterval(10, 0, 1);
+
+ testEdge.setValue(15000);
+ limiter.refill(10);
+ limiter.consume(10);
+ long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+ // Should still be at max, not increasing further
+ assertTrue(
+ "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: "
+ + stillMaxWaitInterval,
+ Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2);
+ }
+
+ @Test
+ public void testOversubscriptionCapsAtMaximum() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Create extreme low utilization to push oversubscription to max
+ // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals
+ for (int i = 0; i < 25; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ // Very low consumption to maximize oversubscription increase
+ limiter.consume(1);
+ }
+
+ // Check that refill is capped at max oversubscription
+ testEdge.setValue(14000);
+ long refillWithMaxOversubscription = limiter.refill(10);
+
+ // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6
+ // (5 is the interval-adjusted limit for 500ms refill interval)
+ assertTrue("Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription,
+ refillWithMaxOversubscription <= 7);
+
+ // Further low utilization should not increase refill
+ testEdge.setValue(14500);
+ limiter.refill(10);
+ limiter.consume(1);
+
+ testEdge.setValue(15000);
+ long stillMaxRefill = limiter.refill(10);
+
+ // Should remain at cap
+ assertEquals("Refill should remain at max oversubscription", refillWithMaxOversubscription,
+ stillMaxRefill);
+ }
+
+ @Test
+ public void testMultipleRefillIntervals() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+ limiter.consume(10);
+
+ // Jump forward by multiple refill intervals (3 intervals = 1500ms)
+ testEdge.setValue(1000 + 1500);
+
+ // Should refill 3 intervals worth, but capped at oversubscribed limit
+ long multiIntervalRefill = limiter.refill(10);
+
+ // With 500ms refill interval, each interval gives 5 resources
+ // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10
+ assertTrue("Multiple interval refill should provide multiple refill amounts. Got: "
+ + multiIntervalRefill, multiIntervalRefill >= 10);
+ }
+
+ @Test
+ public void testRefillIntervalAdjustment() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+
+ // First refill should give full limit
+ long firstRefill = limiter.refill(10);
+ assertEquals("First refill should give full limit", 10, firstRefill);
+
+ limiter.consume(10);
+
+ // After exactly one refill interval (500ms), should get interval-adjusted amount
+ testEdge.setValue(1000 + 500);
+ long adjustedRefill = limiter.refill(10);
+
+ // 500ms is half of 1000ms time unit, so should get half the limit = 5
+ assertEquals("Refill after one interval should be interval-adjusted", 5, adjustedRefill);
+ }
+
+ @Test
+ public void testBackoffMultiplierBottomsAtOne() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Record baseline wait with no backoff applied
+ limiter.consume(10);
+ long baselineWait = limiter.getWaitInterval(10, 0, 1);
+
+ // Run many intervals without contention to ensure multiplier stays at 1.0
+ for (int i = 0; i < 20; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ limiter.consume(3); // No contention
+ }
+
+ // Wait interval should still be at baseline (multiplier = 1.0)
+ testEdge.setValue(11500);
+ limiter.refill(10);
+ limiter.consume(10);
+ long noContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+ assertEquals("Wait interval should not go below baseline (multiplier=1.0)", baselineWait,
+ noContentionWait);
+ }
+
+ @Test
+ public void testConcurrentAccess() throws InterruptedException {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(100, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(100);
+
+ // Simulate concurrent access
+ Thread[] threads = new Thread[10];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ for (int j = 0; j < 10; j++) {
+ limiter.consume(1);
+ limiter.getWaitInterval(100, 50, 1);
+ }
+ });
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Should complete without exceptions - basic thread safety verification
+ assertTrue("Concurrent access should complete successfully", true);
+ }
+
+ @Test
+ public void testOverconsumptionBehavior() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Over-consume significantly
+ limiter.consume(20);
+
+ // Should require waiting for multiple intervals (500ms refill interval)
+ long waitInterval = limiter.getWaitInterval(10, -10, 1);
+ assertTrue("Should require substantial wait after over-consumption", waitInterval >= 500);
+ }
+
+ @Test
+ public void testOscillatingLoadPattern() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Oscillate between high contention and low contention
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // High contention phase - increase backoff
+ for (int i = 0; i < 3; i++) {
+ testEdge.setValue(1000 + (cycle * 3000) + (i * 500));
+ limiter.refill(10);
+ limiter.consume(10);
+ limiter.getWaitInterval(10, 0, 1); // Create contention
+ }
+
+ long highContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+ // Low contention phase - decrease backoff
+ for (int i = 0; i < 3; i++) {
+ testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500));
+ limiter.refill(10);
+ limiter.consume(3); // No contention
+ }
+
+ testEdge.setValue(1000 + (cycle * 3000) + 3000);
+ limiter.refill(10);
+ limiter.consume(10);
+ long lowContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+ // After low contention phase, wait should be lower than after high contention
+ assertTrue(
+ "Wait should decrease after low contention phase in cycle " + cycle + ". High: "
+ + highContentionWait + ", Low: " + lowContentionWait,
+ lowContentionWait < highContentionWait);
+ }
+ }
+
+ @Test
+ public void testUtilizationEmaConvergence() {
+ FeedbackAdaptiveRateLimiter limiter = factory.create();
+ limiter.set(10, TimeUnit.SECONDS);
+
+ testEdge.setValue(1000);
+ limiter.refill(10);
+
+ // Consistently consume at 80% utilization
+ for (int i = 0; i < 30; i++) {
+ testEdge.setValue(1000 + (i + 1) * 500);
+ limiter.refill(10);
+ limiter.consume(4); // 4 out of 5 interval-adjusted = 80%
+ }
+
+ // After many intervals, oversubscription should stabilize
+ // At 80% utilization (below 90% target), oversubscription should increase
+ testEdge.setValue(16500);
+ limiter.refill(10);
+
+ // Now switch to 100% utilization
+ for (int i = 0; i < 30; i++) {
+ testEdge.setValue(16500 + (i + 1) * 500);
+ long refilled = limiter.refill(10);
+ limiter.consume((int) refilled); // Consume everything
+ }
+
+ // At 100% utilization (within target range), oversubscription should stabilize
+ testEdge.setValue(32000);
+ limiter.refill(10);
+
+ // The EMA should have adjusted, and refills should be different
+ // (though exact values depend on EMA convergence rate)
+ assertTrue("Refill behavior should adapt to utilization patterns", true);
+ }
+
+ private static final class ManualEnvironmentEdge implements EnvironmentEdge {
+ private long currentTime = 1000;
+
+ public void setValue(long time) {
+ this.currentTime = time;
+ }
+
+ @Override
+ public long currentTime() {
+ return currentTime;
+ }
+ }
+}