HADOOP-15778. ABFS: Fix client side throttling for read.
Contributed by Sneha Varma.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 97ea2a6..1c6ce17 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -19,9 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
/**
* Throttles Azure Blob File System read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
@@ -37,6 +40,7 @@
public final class AbfsClientThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
+ private static final String RANGE_PREFIX = "bytes=";
private static AbfsClientThrottlingIntercept singleton = null;
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
@@ -82,7 +86,8 @@
}
break;
case ReadFile:
- contentLength = abfsHttpOperation.getBytesReceived();
+ String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
+ contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
@@ -114,4 +119,17 @@
break;
}
}
-}
\ No newline at end of file
+
+ private static long getContentLengthIfKnown(String range) {
+ long contentLength = 0;
+ // Format is "bytes=%d-%d"
+ if (range != null && range.startsWith(RANGE_PREFIX)) {
+ String[] offsets = range.substring(RANGE_PREFIX.length()).split("-");
+ if (offsets.length == 2) {
+ contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
+ + 1;
+ }
+ }
+ return contentLength;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 9a71879..3f5717e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -156,9 +156,10 @@
client.getAccessToken());
}
+ AbfsClientThrottlingIntercept.sendingRequest(operationType);
+
if (hasRequestBody) {
// HttpUrlConnection requires
- AbfsClientThrottlingIntercept.sendingRequest(operationType);
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
}