HADOOP-15778. ABFS: Fix client side throttling for read.
Contributed by Sneha Varma.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 97ea2a6..1c6ce17 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -19,9 +19,12 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.net.HttpURLConnection;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
 /**
  * Throttles Azure Blob File System read and write operations to achieve maximum
  * throughput by minimizing errors.  The errors occur when the account ingress
@@ -37,6 +40,7 @@
 public final class AbfsClientThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
+  private static final String RANGE_PREFIX = "bytes=";
   private static AbfsClientThrottlingIntercept singleton = null;
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
@@ -82,7 +86,8 @@
         }
         break;
       case ReadFile:
-        contentLength = abfsHttpOperation.getBytesReceived();
+        String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
+        contentLength = getContentLengthIfKnown(range);
         if (contentLength > 0) {
           singleton.readThrottler.addBytesTransferred(contentLength,
               isFailedOperation);
@@ -114,4 +119,17 @@
         break;
     }
   }
-}
\ No newline at end of file
+
+  private static long getContentLengthIfKnown(String range) {
+    long contentLength = 0;
+    // Format is "bytes=%d-%d"
+    if (range != null && range.startsWith(RANGE_PREFIX)) {
+      String[] offsets = range.substring(RANGE_PREFIX.length()).split("-");
+      if (offsets.length == 2) {
+        contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
+                + 1;
+      }
+    }
+    return contentLength;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 9a71879..3f5717e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -156,9 +156,10 @@
                 client.getAccessToken());
       }
 
+      AbfsClientThrottlingIntercept.sendingRequest(operationType);
+
       if (hasRequestBody) {
         // HttpUrlConnection requires
-        AbfsClientThrottlingIntercept.sendingRequest(operationType);
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
       }