Bump a throttling counter on BigQueryRead retries due to RESOURCE_EXHAUSTED
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index cb54c60..19670f4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -75,6 +75,8 @@
       "org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer$LoggingHttpBackOffHandler";
   protected static final String BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE =
       "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
+  protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
+      "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
   protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";
 
   private BatchModeExecutionContext(
@@ -555,6 +557,13 @@
         totalThrottleMsecs += bigqueryStreamingInsertThrottleTime.getCumulative();
       }
 
+      CounterCell bigqueryReadThrottleTime =
+          container.tryGetCounter(
+              MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
+      if (bigqueryReadThrottleTime != null) {
+        totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
+      }
+
       CounterCell throttlingMsecs =
           container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
       if (throttlingMsecs != null) {
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 215a66b..731cf33 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -91,6 +91,7 @@
   compile library.java.grpc_netty
   compile library.java.grpc_netty_shaded
   permitUnusedDeclared library.java.grpc_netty_shaded // BEAM-11761
+  compile library.java.grpc_protobuf
   compile library.java.grpc_stub
   permitUnusedDeclared library.java.grpc_stub // BEAM-11761
   compile library.java.grpc_google_cloud_pubsub_v1
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index afa5ae7..a636b04 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -83,6 +83,11 @@
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Int64Value;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.protobuf.ProtoUtils;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -162,6 +167,9 @@
           MonitoringInfoConstants.Labels.SERVICE, "BigQuery",
           MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
 
+  private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
+      ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
+
   @Override
   public JobService getJobService(BigQueryOptions options) {
     return new JobServiceImpl(options);
@@ -1304,6 +1312,32 @@
 
   static class StorageClientImpl implements StorageClient {
 
+    // If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump
+    // throttlingMsecs according to delay. Runtime can use this information for
+    // autoscaling decisions.
+    @VisibleForTesting
+    public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
+      public final Counter throttlingMsecs =
+          Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+
+      @SuppressWarnings("ProtoDurationGetSecondsGetNano")
+      @Override
+      public void onRetryAttempt(Status status, Metadata metadata) {
+        if (status != null
+            && status.getCode() == Code.RESOURCE_EXHAUSTED
+            && metadata != null
+            && metadata.containsKey(KEY_RETRY_INFO)) {
+          RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
+          if (retryInfo.hasRetryDelay()) {
+            long delay =
+                retryInfo.getRetryDelay().getSeconds() * 1000
+                    + retryInfo.getRetryDelay().getNanos() / 1000000;
+            throttlingMsecs.inc(delay);
+          }
+        }
+      }
+    }
+
     private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
         FixedHeaderProvider.create(
             "user-agent", "Apache_Beam_Java/" + ReleaseInfo.getReleaseInfo().getVersion());
@@ -1317,7 +1351,8 @@
               .setTransportChannelProvider(
                   BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
                       .setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
-                      .build());
+                      .build())
+              .setReadRowsRetryAttemptListener(new RetryAttemptCounter());
 
       UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
           settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 53d8db0..a9a3609 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -73,6 +73,10 @@
 import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.RetryBoundedBackOff;
+import com.google.protobuf.Parser;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -90,6 +94,7 @@
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
@@ -152,6 +157,7 @@
     // Setup the ProcessWideContainer for testing metrics are set.
     MetricsContainerImpl container = new MetricsContainerImpl(null);
     MetricsEnvironment.setProcessWideContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
   }
 
   @FunctionalInterface
@@ -1560,4 +1566,65 @@
     client.splitReadStream(request, "myproject:mydataset.mytable");
     verifyReadMetricWasSet("myproject", "mydataset", "mytable", "resource_exhausted", 1);
   }
+
+  @Test
+  public void testRetryAttemptCounter() {
+    BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter =
+        new BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter();
+
+    RetryInfo retryInfo =
+        RetryInfo.newBuilder()
+            .setRetryDelay(
+                com.google.protobuf.Duration.newBuilder()
+                    .setSeconds(123)
+                    .setNanos(456000000)
+                    .build())
+            .build();
+
+    Metadata metadata = new Metadata();
+    metadata.put(
+        Metadata.Key.of(
+            "google.rpc.retryinfo-bin",
+            new Metadata.BinaryMarshaller<RetryInfo>() {
+              @Override
+              public byte[] toBytes(RetryInfo value) {
+                return value.toByteArray();
+              }
+
+              @Override
+              public RetryInfo parseBytes(byte[] serialized) {
+                try {
+                  Parser<RetryInfo> parser = (RetryInfo.newBuilder().build()).getParserForType();
+                  return parser.parseFrom(serialized);
+                } catch (Exception e) {
+                  return null;
+                }
+              }
+            }),
+        retryInfo);
+
+    MetricName metricName =
+        MetricName.named(
+            "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl",
+            "throttling-msecs");
+    MetricsContainerImpl container =
+        (MetricsContainerImpl) MetricsEnvironment.getCurrentContainer();
+
+    // Nulls don't bump the counter.
+    counter.onRetryAttempt(null, null);
+    assertEquals(0, (long) container.getCounter(metricName).getCumulative());
+
+    // Resource exhausted with empty metadata doesn't bump the counter.
+    counter.onRetryAttempt(
+        Status.RESOURCE_EXHAUSTED.withDescription("You have consumed some quota"), new Metadata());
+    assertEquals(0, (long) container.getCounter(metricName).getCumulative());
+
+    // Resource exhausted with retry info bumps the counter.
+    counter.onRetryAttempt(Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata);
+    assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
+
+    // Other errors with retry info doesn't bump the counter.
+    counter.onRetryAttempt(Status.UNAVAILABLE.withDescription("Server is gone"), metadata);
+    assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
+  }
 }