HBASE-28385 Improve scan quota estimates when using block bytes scanned (#5713)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index a4ff8b2..2e26765 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -27,10 +27,17 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DefaultOperationQuota implements OperationQuota {
+ // a single scan estimate can consume no more than this proportion of the limiter's limit
+ // this prevents a long-running scan from being estimated at, say, 100MB of IO against
+ // a <100MB/IO throttle (because this would never succeed)
+ private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;
+
protected final List<QuotaLimiter> limiters;
private final long writeCapacityUnit;
private final long readCapacityUnit;
@@ -53,6 +60,7 @@
protected long readCapacityUnitDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;
+ private long maxScanEstimate;
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
@@ -60,6 +68,9 @@
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
+ long readSizeLimit =
+ Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
+ maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
}
/**
@@ -80,21 +91,34 @@
}
@Override
- public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
- updateEstimateConsumeQuota(numWrites, numReads, numScans);
+ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
+ updateEstimateConsumeBatchQuota(numWrites, numReads);
+ checkQuota(numWrites, numReads);
+ }
+ @Override
+ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+ long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
+ updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
+ prevBlockBytesScannedDifference);
+ checkQuota(0, 1);
+ }
+
+ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException {
readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) {
- if (limiter.isBypass()) continue;
+ if (limiter.isBypass()) {
+ continue;
+ }
- limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
+ limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}
for (final QuotaLimiter limiter : limiters) {
- limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
- writeCapacityUnitConsumed, readCapacityUnitConsumed);
+ limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
+ readCapacityUnitConsumed);
}
}
@@ -158,24 +182,69 @@
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param numWrites the number of write requests
* @param numReads the number of read requests
- * @param numScans the number of scan requests
*/
- protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
+ protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
- readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
} else {
// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
- readConsumed += numScans > 0 ? blockSizeBytes : 0;
}
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
}
+ /**
+ * Update estimate quota(read/write size/capacityUnits) which will be consumed
+ * @param scanRequest the scan to be executed
+ * @param maxScannerResultSize the maximum bytes to be returned by the scanner
+ * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
+ * scanner
+ * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
+ * calls
+ */
+ protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
+ long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
+ if (useResultSizeBytes) {
+ readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
+ } else {
+ long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(),
+ maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference);
+ readConsumed = Math.min(maxScanEstimate, estimate);
+ }
+
+ readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
+ }
+
+ protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
+ long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
+ /*
+ * Estimating scan workload is more complicated, and if we severely underestimate workloads then
+ * throttled clients will exhaust retries too quickly, and could saturate the RPC layer
+ */
+ if (nextCallSeq == 0) {
+ // start scanners with an optimistic 1 block IO estimate
+ // it is better to underestimate a large scan in the beginning
+ // than to overestimate, and block, a small scan
+ return blockSizeBytes;
+ }
+
+ boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes;
+ if (isWorkloadGrowing) {
+ // if nextCallSeq > 0 and the workload is growing then our estimate
+ // should consider that the workload may continue to increase
+ return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned);
+ } else {
+ // if nextCallSeq > 0 and the workload is shrinking or flat
+ // then our workload has likely plateaued. We can just rely on the existing
+ // maxBlockBytesScanned as our estimate in this case.
+ return maxBlockBytesScanned;
+ }
+ }
+
private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
if (numReqs > 0) {
return avgSize * numReqs;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
index 1788e55..3077d6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
@@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/*
* Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed
* throttle quota means, user can over consume user/namespace/table quota if region server has
@@ -47,15 +49,32 @@
}
@Override
- public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
+ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
+ Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads);
+ CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads);
+ checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0);
+ }
+
+ @Override
+ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+ long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
+ Runnable estimateQuota = () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize,
+ maxBlockBytesScanned, prevBlockBytesScannedDifference);
+ CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize,
+ maxBlockBytesScanned, prevBlockBytesScannedDifference);
+ checkQuota(estimateQuota, checkQuota, 0, 0, 1);
+ }
+
+ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites,
+ int numReads, int numScans) throws RpcThrottlingException {
if (regionServerLimiter.isBypass()) {
// If region server limiter is bypass, which means no region server quota is set, check and
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
LOG.warn("Exceed throttle quota is enabled but no region server quotas found");
- super.checkQuota(numWrites, numReads, numScans);
+ checkQuota.run();
} else {
// 1. Update estimate quota which will be consumed
- updateEstimateConsumeQuota(numWrites, numReads, numScans);
+ estimateQuota.run();
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
@@ -63,11 +82,11 @@
// limiter is enough.
boolean exceed = false;
try {
- super.checkQuota(numWrites, numReads, numScans);
+ checkQuota.run();
} catch (RpcThrottlingException e) {
exceed = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, "
+ LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{}, scans:{}, "
+ "try use region server quota", numWrites, numReads, numScans);
}
}
@@ -96,4 +115,8 @@
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff);
}
}
+
+ private interface CheckQuotaRunnable {
+ void run() throws RpcThrottlingException;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index b64429d..736560e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* Noop operation quota returned when no quota is associated to the user/table
*/
@@ -40,7 +42,13 @@
}
@Override
- public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
+ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
+ // no-op
+ }
+
+ @Override
+ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+ long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
// no-op
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 63d7610..cf1e49c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -71,6 +71,11 @@
}
@Override
+ public long getReadLimit() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
public String toString() {
return "NoopQuotaLimiter";
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index bedad5e..ef0a35f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* Interface that allows to check the quota available for an operation.
*/
@@ -51,11 +53,25 @@
* on the number of operations to perform and the average size accumulated during time.
* @param numWrites number of write operation that will be performed
* @param numReads number of small-read operation that will be performed
- * @param numScans number of long-read operation that will be performed
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
- void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException;
+ void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException;
+
+ /**
+ * Checks if it is possible to execute the scan. The quota will be estimated based on the
+ * composition of the scan.
+ * @param scanRequest the given scan operation
+ * @param maxScannerResultSize the maximum bytes to be returned by the scanner
+ * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
+ * scanner
+ * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
+ * calls
+ * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
+ * exceeded.
+ */
+ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+ long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException;
/** Cleanup method on operation completion */
void close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 14326e4..8d00a70 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -76,6 +76,9 @@
/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();
+ /** Returns the maximum number of bytes ever available to read */
+ long getReadLimit();
+
/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 3c72c66..92a0cfd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -156,38 +156,82 @@
/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
- * available quota and to report the data/usage of the operation.
+ * available quota and to report the data/usage of the operation. This method is specific to scans
+ * because estimating a scan's workload is more complicated than estimating the workload of a
+ * get/put.
+ * @param region the region where the operation will be performed
+ * @param scanRequest the scan to be estimated against the quota
+ * @param maxScannerResultSize the maximum bytes to be returned by the scanner
+ * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
+ * scanner
+ * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
+ * calls
+ * @return the OperationQuota
+ * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
+ */
+ public OperationQuota checkScanQuota(final Region region,
+ final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
+ long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
+ throws IOException, RpcThrottlingException {
+ Optional<User> user = RpcServer.getRequestUser();
+ UserGroupInformation ugi;
+ if (user.isPresent()) {
+ ugi = user.get().getUGI();
+ } else {
+ ugi = User.getCurrent().getUGI();
+ }
+ TableDescriptor tableDescriptor = region.getTableDescriptor();
+ TableName table = tableDescriptor.getTableName();
+
+ OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
+ try {
+ quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
+ prevBlockBytesScannedDifference);
+ } catch (RpcThrottlingException e) {
+ LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan="
+ + scanRequest.getScannerId() + ": " + e.getMessage());
+ throw e;
+ }
+ return quota;
+ }
+
+ /**
+ * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
+ * available quota and to report the data/usage of the operation. This method does not support
+ * scans because estimating a scan's workload is more complicated than estimating the workload of
+ * a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
- public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
- throws IOException, RpcThrottlingException {
+ public OperationQuota checkBatchQuota(final Region region,
+ final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
switch (type) {
- case SCAN:
- return checkQuota(region, 0, 0, 1);
case GET:
- return checkQuota(region, 0, 1, 0);
+ return this.checkBatchQuota(region, 0, 1);
case MUTATE:
- return checkQuota(region, 1, 0, 0);
+ return this.checkBatchQuota(region, 1, 0);
case CHECK_AND_MUTATE:
- return checkQuota(region, 1, 1, 0);
+ return this.checkBatchQuota(region, 1, 1);
}
throw new RuntimeException("Invalid operation type: " + type);
}
/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
- * available quota and to report the data/usage of the operation.
+ * available quota and to report the data/usage of the operation. This method does not support
+ * scans because estimating a scan's workload is more complicated than estimating the workload of
+ * a get/put.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
- public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
- boolean hasCondition) throws IOException, RpcThrottlingException {
+ public OperationQuota checkBatchQuota(final Region region,
+ final List<ClientProtos.Action> actions, boolean hasCondition)
+ throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
@@ -202,7 +246,7 @@
numReads++;
}
}
- return checkQuota(region, numWrites, numReads, 0);
+ return checkBatchQuota(region, numWrites, numReads);
}
/**
@@ -211,12 +255,11 @@
* @param region the region where the operation will be performed
* @param numWrites number of writes to perform
* @param numReads number of short-reads to perform
- * @param numScans number of scan to perform
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
- private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads,
- final int numScans) throws IOException, RpcThrottlingException {
+ private OperationQuota checkBatchQuota(final Region region, final int numWrites,
+ final int numReads) throws IOException, RpcThrottlingException {
Optional<User> user = RpcServer.getRequestUser();
UserGroupInformation ugi;
if (user.isPresent()) {
@@ -229,11 +272,10 @@
OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
try {
- quota.checkQuota(numWrites, numReads, numScans);
+ quota.checkBatchQuota(numWrites, numReads);
} catch (RpcThrottlingException e) {
- LOG.debug(
- "Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites="
- + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + e.getMessage());
+ LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
+ + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage());
throw e;
}
return quota;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 8ae2cae..483edbc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -244,6 +244,11 @@
}
@Override
+ public long getReadLimit() {
+ return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
+ }
+
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("TimeBasedLimiter(");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7043b78..a2b9a93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -429,6 +429,9 @@
private boolean fullRegionScan;
private final String clientIPAndPort;
private final String userName;
+ private volatile long maxBlockBytesScanned = 0;
+ private volatile long prevBlockBytesScanned = 0;
+ private volatile long prevBlockBytesScannedDifference = 0;
RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
@@ -452,6 +455,22 @@
return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
}
+ long getMaxBlockBytesScanned() {
+ return maxBlockBytesScanned;
+ }
+
+ long getPrevBlockBytesScannedDifference() {
+ return prevBlockBytesScannedDifference;
+ }
+
+ void updateBlockBytesScanned(long blockBytesScanned) {
+ prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
+ prevBlockBytesScanned = blockBytesScanned;
+ if (blockBytesScanned > maxBlockBytesScanned) {
+ maxBlockBytesScanned = blockBytesScanned;
+ }
+ }
+
// Should be called only when we need to print lease expired messages otherwise
// cache the String once made.
@Override
@@ -2466,7 +2485,7 @@
}
Boolean existence = null;
Result r = null;
- quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
+ quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
Get clientGet = ProtobufUtil.toGet(get);
if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
@@ -2683,7 +2702,7 @@
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+ quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
@@ -2746,7 +2765,7 @@
try {
region = getRegion(regionSpecifier);
- quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
+ quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
@@ -2931,7 +2950,7 @@
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
- quota = getRpcQuotaManager().checkQuota(region, operationType);
+ quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();
@@ -3487,6 +3506,7 @@
if (rpcCall != null) {
responseCellSize = rpcCall.getResponseCellSize();
blockBytesScanned = rpcCall.getBlockBytesScanned();
+ rsh.updateBlockBytesScanned(blockBytesScanned);
}
region.getMetrics().updateScan();
final MetricsRegionServer metricsRegionServer = server.getMetrics();
@@ -3590,7 +3610,8 @@
}
OperationQuota quota;
try {
- quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+ quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
+ rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
index 5de9a2d..c058abe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
@@ -23,6 +23,7 @@
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
+import static org.junit.Assert.assertTrue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -60,12 +61,17 @@
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest");
+ private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// client should fail fast
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+ MAX_SCANNER_RESULT_SIZE);
+ TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY,
+ AverageIntervalRateLimiter.class, RateLimiter.class);
// quotas enabled, using block bytes scanned
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
@@ -140,27 +146,75 @@
waitMinuteQuota();
// should execute 1 request
- testTraffic(() -> doScans(5, table), 1, 0);
+ testTraffic(() -> doScans(5, table, 1), 1, 0);
// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
- testTraffic(() -> doScans(100, table), 100, 0);
- testTraffic(() -> doScans(100, table), 100, 0);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
// Add ~3 block/sec limit. This should support >1 scans
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(3.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
- // should execute some requests, but not all
- testTraffic(() -> doScans(100, table), 100, 90);
+ // Add 50 block/sec limit. This should support >1 scans
+ admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
+ Math.round(50.1 * blockSize), TimeUnit.SECONDS));
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
+
+ // This will produce some throttling exceptions, but all/most should succeed within the timeout
+ testTraffic(() -> doScans(100, table, 1), 75, 25);
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
+
+ // With large caching, a big scan should succeed
+ testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0);
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
- testTraffic(() -> doScans(100, table), 100, 0);
- testTraffic(() -> doScans(100, table), 100, 0);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
+ }
+
+ @Test
+ public void testSmallScanNeverBlockedByLargeEstimate() throws Exception {
+ final Admin admin = TEST_UTIL.getAdmin();
+ final String userName = User.getCurrent().getShortName();
+ Table table = admin.getConnection().getTable(TABLE_NAME);
+
+ doPuts(10_000, FAMILY, QUALIFIER, table);
+ TEST_UTIL.flush(TABLE_NAME);
+
+ // Add 99MB/sec limit.
+ // This should never be blocked, but with a sequence number approaching 10k, without
+ // other intervention, we would estimate a scan workload approaching 625MB or the
+ // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all
+ // requests succeed, so the estimate never becomes large enough to cause read downtime
+ long limit = 99 * 1024 * 1024;
+ assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code
+ // changes
+ admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit,
+ TimeUnit.SECONDS));
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
+
+ // should execute all requests
+ testTraffic(() -> doScans(10_000, table, 1), 10_000, 0);
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+ waitMinuteQuota();
+
+ // Remove all the limits
+ admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+ triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
+ testTraffic(() -> doScans(100, table, 1), 100, 0);
}
@Test
@@ -223,9 +277,8 @@
boolean success = (actualSuccess >= expectedSuccess - marginOfError)
&& (actualSuccess <= expectedSuccess + marginOfError);
if (!success) {
- triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+ triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();
- Thread.sleep(15_000L);
}
return success;
});
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
new file mode 100644
index 0000000..4684be0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestDefaultOperationQuota {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
+
+ @Test
+ public void testScanEstimateNewScanner() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 0;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 0;
+ long prevBBSDifference = 0;
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // new scanner should estimate scan read as 1 block
+ assertEquals(blockSize, estimate);
+ }
+
+ @Test
+ public void testScanEstimateSecondNextCall() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 1;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 10 * blockSize;
+ long prevBBSDifference = 10 * blockSize;
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // 2nd next call should be estimated at maxBBS
+ assertEquals(maxBlockBytesScanned, estimate);
+ }
+
+ @Test
+ public void testScanEstimateFlatWorkload() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 100;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 10 * blockSize;
+ long prevBBSDifference = 0;
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // flat workload should not overestimate
+ assertEquals(maxBlockBytesScanned, estimate);
+ }
+
+ @Test
+ public void testScanEstimateVariableFlatWorkload() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 1;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 10 * blockSize;
+ long prevBBSDifference = 0;
+ for (int i = 0; i < 100; i++) {
+ long variation = Math.round(Math.random() * blockSize);
+ if (variation % 2 == 0) {
+ variation *= -1;
+ }
+ // despite +/- <1 block variation, we consider this workload flat
+ prevBBSDifference = variation;
+
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // flat workload should not overestimate
+ assertEquals(maxBlockBytesScanned, estimate);
+ }
+ }
+
+ @Test
+ public void testScanEstimateGrowingWorkload() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 100;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 20 * blockSize;
+ long prevBBSDifference = 10 * blockSize;
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // growing workload should overestimate
+ assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate);
+ }
+
+ @Test
+ public void testScanEstimateShrinkingWorkload() {
+ long blockSize = 64 * 1024;
+ long nextCallSeq = 100;
+ long maxScannerResultSize = 100 * 1024 * 1024;
+ long maxBlockBytesScanned = 20 * blockSize;
+ long prevBBSDifference = -10 * blockSize;
+ long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
+ maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
+
+ // shrinking workload should only shrink estimate to maxBBS
+ assertEquals(maxBlockBytesScanned, estimate);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
index ff34c52..8da2989 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
@@ -152,22 +152,21 @@
return opCount;
}
- static long doScans(int maxOps, Table table) {
+ static long doScans(int desiredRows, Table table, int caching) {
int count = 0;
- int caching = 100;
try {
Scan scan = new Scan();
scan.setCaching(caching);
scan.setCacheBlocks(false);
ResultScanner scanner = table.getScanner(scan);
- while (count < (maxOps * caching)) {
+ while (count < desiredRows) {
scanner.next();
count += 1;
}
} catch (IOException e) {
LOG.error("scan failed after nRetries=" + count, e);
}
- return count / caching;
+ return count;
}
static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass,