[BEAM-12670] Fix regression, properly report API call metric properly on error for BQ streaming_inserts
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index eefdb05..114c42df 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -617,6 +617,8 @@
 
     Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
       /rest/v2/tabledata/insertAll."""
+    from google.api_core.exceptions import ClientError
+    from google.api_core.exceptions import GoogleAPICallError
     # The rows argument is a list of
     # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
     # required by the InsertAll() method.
@@ -652,9 +654,13 @@
       else:
         for insert_error in errors:
           service_call_metric.call(insert_error['errors'][0])
+    except (ClientError, GoogleAPICallError) as e:
+      # e.code.value contains the numeric http status code.
+      service_call_metric.call(e.code.value)
+      # Re-reise the exception so that we re-try appropriately.
+      raise
     except HttpError as e:
       service_call_metric.call(e)
-
       # Re-reise the exception so that we re-try appropriately.
       raise
     finally:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index c9ed827..1252ffb 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -33,6 +33,7 @@
 
 import apache_beam as beam
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.gcp import resource_identifiers
 from apache_beam.io.gcp.bigquery import TableRowJsonCoder
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
 from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
@@ -44,6 +45,8 @@
 from apache_beam.io.gcp.bigquery_tools import parse_table_reference
 from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
 from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.value_provider import StaticValueProvider
 
@@ -422,6 +425,62 @@
     upload = client.jobs.Insert.call_args[1]["upload"]
     self.assertEqual(b'some,data', upload.stream.read())
 
+  def verify_write_call_metric(
+      self, project_id, dataset_id, table_id, status, count):
+    """Check if an metric was recorded for the BQ IO write API call."""
+    process_wide_monitoring_infos = list(
+        MetricsEnvironment.process_wide_container().
+        to_runner_api_monitoring_infos(None).values())
+    resource = resource_identifiers.BigQueryTable(
+        project_id, dataset_id, table_id)
+    labels = {
+        # TODO(ajamato): Add Ptransform label.
+        monitoring_infos.SERVICE_LABEL: 'BigQuery',
+        # Refer to any method which writes elements to BigQuery in batches
+        # as "BigQueryBatchWrite". I.e. storage API's insertAll, or future
+        # APIs introduced.
+        monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite',
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id,
+        monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id,
+        monitoring_infos.BIGQUERY_TABLE_LABEL: table_id,
+        monitoring_infos.STATUS_LABEL: status,
+    }
+    expected_mi = monitoring_infos.int64_counter(
+        monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels)
+    expected_mi.ClearField("start_time")
+
+    found = False
+    for actual_mi in process_wide_monitoring_infos:
+      actual_mi.ClearField("start_time")
+      if expected_mi == actual_mi:
+        found = True
+        break
+    self.assertTrue(
+        found, "Did not find write call metric with status: %s" % status)
+
+  def test_insert_rows_sets_metric_on_failure(self):
+    from google.api_core import exceptions
+    MetricsEnvironment.process_wide_container().reset()
+    client = mock.Mock()
+    client.insert_rows_json = mock.Mock(
+        # Fail a few times, then succeed.
+        side_effect=[
+            exceptions.DeadlineExceeded("Deadline Exceeded"),
+            exceptions.InternalServerError("Internal Error"),
+            [],
+        ])
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+    wrapper.insert_rows("my_project", "my_dataset", "my_table", [])
+
+    # Expect two failing calls, then a success (i.e. two retries).
+    self.verify_write_call_metric(
+        "my_project", "my_dataset", "my_table", "deadline_exceeded", 1)
+    self.verify_write_call_metric(
+        "my_project", "my_dataset", "my_table", "internal", 1)
+    self.verify_write_call_metric(
+        "my_project", "my_dataset", "my_table", "ok", 1)
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryReader(unittest.TestCase):