[BEAM-12670] Fix regression, properly report API call metric properly on error for BQ streaming_inserts
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index eefdb05..114c42df 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -617,6 +617,8 @@
Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
/rest/v2/tabledata/insertAll."""
+ from google.api_core.exceptions import ClientError
+ from google.api_core.exceptions import GoogleAPICallError
# The rows argument is a list of
# bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
# required by the InsertAll() method.
@@ -652,9 +654,13 @@
else:
for insert_error in errors:
service_call_metric.call(insert_error['errors'][0])
+ except (ClientError, GoogleAPICallError) as e:
+ # e.code.value contains the numeric http status code.
+ service_call_metric.call(e.code.value)
+ # Re-reise the exception so that we re-try appropriately.
+ raise
except HttpError as e:
service_call_metric.call(e)
-
# Re-reise the exception so that we re-try appropriately.
raise
finally:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index c9ed827..1252ffb 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -33,6 +33,7 @@
import apache_beam as beam
from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.gcp import resource_identifiers
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
@@ -44,6 +45,8 @@
from apache_beam.io.gcp.bigquery_tools import parse_table_reference
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
@@ -422,6 +425,62 @@
upload = client.jobs.Insert.call_args[1]["upload"]
self.assertEqual(b'some,data', upload.stream.read())
+ def verify_write_call_metric(
+ self, project_id, dataset_id, table_id, status, count):
+ """Check if an metric was recorded for the BQ IO write API call."""
+ process_wide_monitoring_infos = list(
+ MetricsEnvironment.process_wide_container().
+ to_runner_api_monitoring_infos(None).values())
+ resource = resource_identifiers.BigQueryTable(
+ project_id, dataset_id, table_id)
+ labels = {
+ # TODO(ajamato): Add Ptransform label.
+ monitoring_infos.SERVICE_LABEL: 'BigQuery',
+ # Refer to any method which writes elements to BigQuery in batches
+ # as "BigQueryBatchWrite". I.e. storage API's insertAll, or future
+ # APIs introduced.
+ monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite',
+ monitoring_infos.RESOURCE_LABEL: resource,
+ monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id,
+ monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id,
+ monitoring_infos.BIGQUERY_TABLE_LABEL: table_id,
+ monitoring_infos.STATUS_LABEL: status,
+ }
+ expected_mi = monitoring_infos.int64_counter(
+ monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels)
+ expected_mi.ClearField("start_time")
+
+ found = False
+ for actual_mi in process_wide_monitoring_infos:
+ actual_mi.ClearField("start_time")
+ if expected_mi == actual_mi:
+ found = True
+ break
+ self.assertTrue(
+ found, "Did not find write call metric with status: %s" % status)
+
+ def test_insert_rows_sets_metric_on_failure(self):
+ from google.api_core import exceptions
+ MetricsEnvironment.process_wide_container().reset()
+ client = mock.Mock()
+ client.insert_rows_json = mock.Mock(
+ # Fail a few times, then succeed.
+ side_effect=[
+ exceptions.DeadlineExceeded("Deadline Exceeded"),
+ exceptions.InternalServerError("Internal Error"),
+ [],
+ ])
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+ wrapper.insert_rows("my_project", "my_dataset", "my_table", [])
+
+ # Expect two failing calls, then a success (i.e. two retries).
+ self.verify_write_call_metric(
+ "my_project", "my_dataset", "my_table", "deadline_exceeded", 1)
+ self.verify_write_call_metric(
+ "my_project", "my_dataset", "my_table", "internal", 1)
+ self.verify_write_call_metric(
+ "my_project", "my_dataset", "my_table", "ok", 1)
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryReader(unittest.TestCase):