Bump a throttling counter on BigQueryRead retries due to RESOURCE_EXHAUSTED
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index cb54c60..19670f4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -75,6 +75,8 @@
"org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer$LoggingHttpBackOffHandler";
protected static final String BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
+ protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
+ "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";
private BatchModeExecutionContext(
@@ -555,6 +557,13 @@
totalThrottleMsecs += bigqueryStreamingInsertThrottleTime.getCumulative();
}
+ CounterCell bigqueryReadThrottleTime =
+ container.tryGetCounter(
+ MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
+ if (bigqueryReadThrottleTime != null) {
+ totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
+ }
+
CounterCell throttlingMsecs =
container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
if (throttlingMsecs != null) {
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 215a66b..731cf33 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -91,6 +91,7 @@
compile library.java.grpc_netty
compile library.java.grpc_netty_shaded
permitUnusedDeclared library.java.grpc_netty_shaded // BEAM-11761
+ compile library.java.grpc_protobuf
compile library.java.grpc_stub
permitUnusedDeclared library.java.grpc_stub // BEAM-11761
compile library.java.grpc_google_cloud_pubsub_v1
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index afa5ae7..a636b04 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -83,6 +83,11 @@
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Int64Value;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.protobuf.ProtoUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -162,6 +167,9 @@
MonitoringInfoConstants.Labels.SERVICE, "BigQuery",
MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+ private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
+ ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
+
@Override
public JobService getJobService(BigQueryOptions options) {
return new JobServiceImpl(options);
@@ -1304,6 +1312,32 @@
static class StorageClientImpl implements StorageClient {
+ // If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump
+ // throttlingMsecs according to delay. Runtime can use this information for
+ // autoscaling decisions.
+ @VisibleForTesting
+ public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
+ public final Counter throttlingMsecs =
+ Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+
+ @SuppressWarnings("ProtoDurationGetSecondsGetNano")
+ @Override
+ public void onRetryAttempt(Status status, Metadata metadata) {
+ if (status != null
+ && status.getCode() == Code.RESOURCE_EXHAUSTED
+ && metadata != null
+ && metadata.containsKey(KEY_RETRY_INFO)) {
+ RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
+ if (retryInfo.hasRetryDelay()) {
+ long delay =
+ retryInfo.getRetryDelay().getSeconds() * 1000
+ + retryInfo.getRetryDelay().getNanos() / 1000000;
+ throttlingMsecs.inc(delay);
+ }
+ }
+ }
+ }
+
private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
FixedHeaderProvider.create(
"user-agent", "Apache_Beam_Java/" + ReleaseInfo.getReleaseInfo().getVersion());
@@ -1317,7 +1351,8 @@
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
- .build());
+ .build())
+ .setReadRowsRetryAttemptListener(new RetryAttemptCounter());
UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 53d8db0..a9a3609 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -73,6 +73,10 @@
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
+import com.google.protobuf.Parser;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -90,6 +94,7 @@
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
@@ -152,6 +157,7 @@
// Setup the ProcessWideContainer for testing metrics are set.
MetricsContainerImpl container = new MetricsContainerImpl(null);
MetricsEnvironment.setProcessWideContainer(container);
+ MetricsEnvironment.setCurrentContainer(container);
}
@FunctionalInterface
@@ -1560,4 +1566,65 @@
client.splitReadStream(request, "myproject:mydataset.mytable");
verifyReadMetricWasSet("myproject", "mydataset", "mytable", "resource_exhausted", 1);
}
+
+ @Test
+ public void testRetryAttemptCounter() {
+ BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter =
+ new BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter();
+
+ RetryInfo retryInfo =
+ RetryInfo.newBuilder()
+ .setRetryDelay(
+ com.google.protobuf.Duration.newBuilder()
+ .setSeconds(123)
+ .setNanos(456000000)
+ .build())
+ .build();
+
+ Metadata metadata = new Metadata();
+ metadata.put(
+ Metadata.Key.of(
+ "google.rpc.retryinfo-bin",
+ new Metadata.BinaryMarshaller<RetryInfo>() {
+ @Override
+ public byte[] toBytes(RetryInfo value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ public RetryInfo parseBytes(byte[] serialized) {
+ try {
+ Parser<RetryInfo> parser = (RetryInfo.newBuilder().build()).getParserForType();
+ return parser.parseFrom(serialized);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }),
+ retryInfo);
+
+ MetricName metricName =
+ MetricName.named(
+ "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl",
+ "throttling-msecs");
+ MetricsContainerImpl container =
+ (MetricsContainerImpl) MetricsEnvironment.getCurrentContainer();
+
+ // Nulls don't bump the counter.
+ counter.onRetryAttempt(null, null);
+ assertEquals(0, (long) container.getCounter(metricName).getCumulative());
+
+ // Resource exhausted with empty metadata doesn't bump the counter.
+ counter.onRetryAttempt(
+ Status.RESOURCE_EXHAUSTED.withDescription("You have consumed some quota"), new Metadata());
+ assertEquals(0, (long) container.getCounter(metricName).getCumulative());
+
+ // Resource exhausted with retry info bumps the counter.
+ counter.onRetryAttempt(Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata);
+ assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
+
+ // Other errors with retry info doesn't bump the counter.
+ counter.onRetryAttempt(Status.UNAVAILABLE.withDescription("Server is gone"), metadata);
+ assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
+ }
}