[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index f7b95e7..29d5c00 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -231,14 +231,23 @@
 
   /** An interface representing a client object for making calls to the BigQuery Storage API. */
   interface StorageClient extends AutoCloseable {
-    /** Create a new read session against an existing table. */
+    /**
+     * Create a new read session against an existing table. This method variant collects request
+     * count metric, table id in the request.
+     */
     ReadSession createReadSession(CreateReadSessionRequest request);
 
     /** Read rows in the context of a specific read stream. */
     BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
 
+    /* This method variant collects request count metric, using the fullTableID metadata. */
+    BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request, String fullTableId);
+
     SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request);
 
+    /* This method variant collects request count metric, using the fullTableID metadata. */
+    SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId);
+
     /**
      * Close the client object.
      *
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index d788bfe..951f1fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,6 +29,7 @@
 import com.google.api.client.util.Sleeper;
 import com.google.api.core.ApiFuture;
 import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.FixedHeaderProvider;
 import com.google.api.gax.rpc.HeaderProvider;
 import com.google.api.gax.rpc.ServerStream;
@@ -100,7 +101,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -843,22 +843,7 @@
         idsToPublish = insertIdList;
       }
 
-      HashMap<String, String> baseLabels = new HashMap<String, String>();
-      // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
-      // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
-      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
-      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
-      baseLabels.put(
-          MonitoringInfoConstants.Labels.RESOURCE,
-          GcpResourceIdentifiers.bigQueryTable(
-              ref.getProjectId(), ref.getDatasetId(), ref.getTableId()));
-      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, ref.getProjectId());
-      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, ref.getDatasetId());
-      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, ref.getTableId());
-
-      ServiceCallMetric serviceCallMetric =
-          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+      ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
 
       while (true) {
         List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
@@ -1363,9 +1348,31 @@
       this.client = BigQueryReadClient.create(settingsBuilder.build());
     }
 
+    // Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
+    // testing
+    // So this wrapper method can be mocked in tests, instead.
+    ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+      return client.createReadSession(request);
+    }
+
     @Override
     public ReadSession createReadSession(CreateReadSessionRequest request) {
-      return client.createReadSession(request);
+      TableReference tableReference =
+          BigQueryUtils.toTableReference(request.getReadSession().getTable());
+      ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      try {
+        ReadSession session = callCreateReadSession(request);
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call("ok");
+        }
+        return session;
+
+      } catch (ApiException e) {
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call(e.getStatusCode().getCode().name());
+        }
+        throw e;
+      }
     }
 
     @Override
@@ -1374,11 +1381,48 @@
     }
 
     @Override
+    public BigQueryServerStream<ReadRowsResponse> readRows(
+        ReadRowsRequest request, String fullTableId) {
+      TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
+      ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      try {
+        BigQueryServerStream<ReadRowsResponse> response = readRows(request);
+        serviceCallMetric.call("ok");
+        return response;
+      } catch (ApiException e) {
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call(e.getStatusCode().getCode().name());
+        }
+        throw e;
+      }
+    }
+
+    @Override
     public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) {
       return client.splitReadStream(request);
     }
 
     @Override
+    public SplitReadStreamResponse splitReadStream(
+        SplitReadStreamRequest request, String fullTableId) {
+      TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
+      ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      try {
+        SplitReadStreamResponse response = splitReadStream(request);
+
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call("ok");
+        }
+        return response;
+      } catch (ApiException e) {
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call(e.getStatusCode().getCode().name());
+        }
+        throw e;
+      }
+    }
+
+    @Override
     public void close() {
       client.close();
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 712456a..120adc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -175,4 +175,9 @@
             kmsKey);
     return bqServices.getDatasetService(options).getTable(queryResultTable);
   }
+
+  @Override
+  protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception {
+    return null;
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index ac8f2ae..504ec9e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -94,6 +94,8 @@
    */
   protected abstract Table getTargetTable(BigQueryOptions options) throws Exception;
 
+  protected abstract @Nullable String getTargetTableId(BigQueryOptions options) throws Exception;
+
   @Override
   public Coder<T> getOutputCoder() {
     return outputCoder;
@@ -105,9 +107,16 @@
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     Table targetTable = getTargetTable(bqOptions);
 
-    ReadSession.Builder readSessionBuilder =
-        ReadSession.newBuilder()
-            .setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+    String tableReferenceId = "";
+    if (targetTable != null) {
+      tableReferenceId = BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+    } else {
+      // If the table does not exist targetTable will be null.
+      // Construct the table id if we can generate it. For error recording/logging.
+      tableReferenceId = getTargetTableId(bqOptions);
+    }
+
+    ReadSession.Builder readSessionBuilder = ReadSession.newBuilder().setTable(tableReferenceId);
 
     if (selectedFieldsProvider != null || rowRestrictionProvider != null) {
       ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index 1a7bc59..462c720 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -21,7 +21,9 @@
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.FailedPreconditionException;
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
 import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -33,6 +35,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
@@ -162,6 +165,9 @@
     private long rowsConsumedFromCurrentResponse;
     private long totalRowsInCurrentResponse;
 
+    private TableReference tableReference;
+    private ServiceCallMetric serviceCallMetric;
+
     private BigQueryStorageStreamReader(
         BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
       this.source = source;
@@ -186,7 +192,9 @@
               .setOffset(currentOffset)
               .build();
 
-      responseStream = storageClient.readRows(request);
+      tableReference = BigQueryUtils.toTableReference(source.readSession.getTable());
+      serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      responseStream = storageClient.readRows(request, source.readSession.getTable());
       responseIterator = responseStream.iterator();
       LOG.info("Started BigQuery Storage API read from stream {}.", source.readStream.getName());
       return readNextRecord();
@@ -205,7 +213,23 @@
           return false;
         }
 
-        ReadRowsResponse response = responseIterator.next();
+        ReadRowsResponse response;
+        try {
+          response = responseIterator.next();
+          // Since we don't have a direct hook to the underlying
+          // API call, record success ever time we read a record successfully.
+          if (serviceCallMetric != null) {
+            serviceCallMetric.call("ok");
+          }
+        } catch (ApiException e) {
+          // Occasionally the iterator will fail and raise an exception.
+          // Capture it here and record the error in the metric.
+          if (serviceCallMetric != null) {
+            serviceCallMetric.call(e.getStatusCode().getCode().name());
+          }
+          throw e;
+        }
+
         progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
         progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
         totalRowsInCurrentResponse = response.getRowCount();
@@ -315,7 +339,8 @@
                   ReadRowsRequest.newBuilder()
                       .setReadStream(splitResponse.getPrimaryStream().getName())
                       .setOffset(currentOffset + 1)
-                      .build());
+                      .build(),
+                  source.readSession.getTable());
           newResponseIterator = newResponseStream.iterator();
           newResponseIterator.hasNext();
         } catch (FailedPreconditionException e) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 39ee5fb..2a14cd5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -102,11 +102,39 @@
 
   @Override
   public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    return getTargetTable(options.as(BigQueryOptions.class)).getNumBytes();
+    Table table = getTargetTable(options.as(BigQueryOptions.class));
+    if (table != null) {
+      return table.getNumBytes();
+    }
+    // If the table does not exist, then it will be null.
+    // Avoid the NullPointerException here, allow a more meaningful table "not_found"
+    // error to be shown to the user, upon table read.
+    return 0;
   }
 
   @Override
-  protected Table getTargetTable(BigQueryOptions options) throws Exception {
+  protected String getTargetTableId(BigQueryOptions options) throws Exception {
+    TableReference tableReference = tableReferenceProvider.get();
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      checkState(
+          !Strings.isNullOrEmpty(options.getProject()),
+          "No project ID set in %s or %s, cannot construct a complete %s",
+          TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName(),
+          TableReference.class.getSimpleName());
+      LOG.info(
+          "Project ID not set in {}. Using default project from {}.",
+          TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName());
+      tableReference.setProjectId(options.getProject());
+    }
+    return String.format(
+        "projects/%s/datasets/%s/tables/%s",
+        tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId());
+  }
+
+  @Override
+  protected @Nullable Table getTargetTable(BigQueryOptions options) throws Exception {
     if (cachedTable.get() == null) {
       TableReference tableReference = tableReferenceProvider.get();
       if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 9166228..6025b22 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -24,6 +24,7 @@
 import static org.apache.beam.sdk.values.Row.toRow;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
@@ -34,17 +35,23 @@
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.Schema;
@@ -75,6 +82,20 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class BigQueryUtils {
+
+  // For parsing the format returned on the API proto:
+  // google.cloud.bigquery.storage.v1.ReadSession.getTable()
+  // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+  private static final Pattern TABLE_RESOURCE_PATTERN =
+      Pattern.compile(
+          "^projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)$");
+
+  // For parsing the format used to refer to tables parameters in BigQueryIO.
+  // "{project_id}:{dataset_id}.{table_id}" or
+  // "{project_id}.{dataset_id}.{table_id}"
+  private static final Pattern SIMPLE_TABLE_PATTERN =
+      Pattern.compile("^(?<PROJECT>[^\\.:]+)[\\.:](?<DATASET>[^\\.:]+)[\\.](?<TABLE>[^\\.:]+)$");
+
   /** Options for how to convert BigQuery data to Beam data. */
   @AutoValue
   public abstract static class ConversionOptions implements Serializable {
@@ -909,4 +930,85 @@
           "Does not support converting avro format: " + value.getClass().getName());
     }
   }
+
+  /**
+   * @param fullTableId - Is one of the two forms commonly used to refer to bigquery tables in the
+   *     beam codebase:
+   *     <ul>
+   *       <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+   *       <li>myproject:mydataset.mytable
+   *       <li>myproject.mydataset.mytable
+   *     </ul>
+   *
+   * @return a BigQueryTableIdentifier by parsing the fullTableId. If it cannot be parsed properly
+   *     null is returned.
+   */
+  public static @Nullable TableReference toTableReference(String fullTableId) {
+    // Try parsing the format:
+    // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+    Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+
+    // If that failed, try the format:
+    // "{project_id}:{dataset_id}.{table_id}" or
+    // "{project_id}.{dataset_id}.{table_id}"
+    m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+    return null;
+  }
+
+  private static ServiceCallMetric callMetricForMethod(
+      TableReference tableReference, String method) {
+    if (tableReference != null) {
+      // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
+      // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+      HashMap<String, String> baseLabels = new HashMap<String, String>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method);
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigQueryTable(
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              tableReference.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, tableReference.getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_DATASET, tableReference.getDatasetId());
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, tableReference.getTableId());
+      return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+    }
+    return null;
+  }
+
+  /**
+   * @param tableReference - The table being read from. Can be a temporary BQ table used to read
+   *     from a SQL query.
+   * @return a ServiceCallMetric for recording statuses for all BQ API responses related to reading
+   *     elements directly from BigQuery in a process-wide metric. Such as: calls to readRows,
+   *     splitReadStream, createReadSession.
+   */
+  public static ServiceCallMetric readCallMetric(TableReference tableReference) {
+    return callMetricForMethod(tableReference, "BigQueryBatchRead");
+  }
+
+  /**
+   * @param tableReference - The table being written to.
+   * @return a ServiceCallMetric for recording statuses for all BQ responses related to writing
+   *     elements directly to BigQuery in a process-wide metric. Such as: insertAll.
+   */
+  public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
+    return callMetricForMethod(tableReference, "BigQueryBatchWrite");
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 81e1027..86f538a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -821,7 +821,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
     when(fakeStorageClient.createReadSession(any())).thenReturn(readSession);
-    when(fakeStorageClient.readRows(expectedReadRowsRequest))
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
 
     BigQueryIO.TypedRead<KV<String, Long>> typedRead =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index c39cd11..135a536 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -774,7 +774,7 @@
             createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(responses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -830,7 +830,7 @@
             createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(responses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -899,7 +899,7 @@
             createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.800, 0.875));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     when(fakeStorageClient.splitReadStream(
@@ -916,7 +916,7 @@
             createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.8, 1.0));
 
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+            ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -974,7 +974,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call.
@@ -993,11 +993,12 @@
                 // This test will read rows 0 and 1 from the parent before calling split,
                 // so we expect the primary read to start at offset 2.
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2)));
 
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
         .thenReturn(
             new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size())));
 
@@ -1051,7 +1052,7 @@
 
     // Mock the initial ReadRows call.
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+            ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), ""))
         .thenReturn(
             new FakeBigQueryServerStream<>(
                 Lists.newArrayList(
@@ -1091,7 +1092,8 @@
             ReadRowsRequest.newBuilder()
                 .setReadStream(readStreams.get(1).getName())
                 .setOffset(1)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(
             new FakeBigQueryServerStream<>(
                 Lists.newArrayList(
@@ -1125,7 +1127,8 @@
             ReadRowsRequest.newBuilder()
                 .setReadStream(readStreams.get(2).getName())
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(
             new FakeBigQueryServerStream<>(
                 Lists.newArrayList(
@@ -1191,7 +1194,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1259,7 +1262,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1280,7 +1283,8 @@
                 // This test will read rows 0 and 1 from the parent before calling split,
                 // so we expect the primary read to start at offset 2.
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenThrow(
             new FailedPreconditionException(
                 "Given row offset is invalid for stream.",
@@ -1373,7 +1377,7 @@
     StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
     when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
         .thenReturn(readSession);
-    when(fakeStorageClient.readRows(expectedReadRowsRequest))
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
 
     PCollection<KV<String, Long>> output =
@@ -1439,7 +1443,7 @@
     StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
     when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
         .thenReturn(readSession);
-    when(fakeStorageClient.readRows(expectedReadRowsRequest))
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
 
     PCollection<TableRow> output =
@@ -1507,7 +1511,7 @@
     StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
     when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
         .thenReturn(readSession);
-    when(fakeStorageClient.readRows(expectedReadRowsRequest))
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
 
     PCollection<KV<String, Long>> output =
@@ -1553,7 +1557,7 @@
                 ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5, 0.75));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(responses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1602,7 +1606,7 @@
             createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 1.0));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(responses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1668,7 +1672,7 @@
                 ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 0.875));
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
-    when(fakeStorageClient.readRows(expectedRequest))
+    when(fakeStorageClient.readRows(expectedRequest, ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponse));
 
     when(fakeStorageClient.splitReadStream(
@@ -1685,7 +1689,7 @@
             createResponseArrow(ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.8, 1.0));
 
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+            ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
 
     BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1734,7 +1738,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call.
@@ -1753,11 +1757,12 @@
                 // This test will read rows 0 and 1 from the parent before calling split,
                 // so we expect the primary read to start at offset 2.
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2)));
 
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
         .thenReturn(
             new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size())));
 
@@ -1824,7 +1829,7 @@
 
     // Mock the initial ReadRows call.
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+            ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mock the first SplitReadStream call.
@@ -1850,7 +1855,8 @@
             ReadRowsRequest.newBuilder()
                 .setReadStream(readStreams.get(1).getName())
                 .setOffset(1)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(new FakeBigQueryServerStream<>(otherResponses));
 
     // Mock the second SplitReadStream call.
@@ -1875,7 +1881,8 @@
             ReadRowsRequest.newBuilder()
                 .setReadStream(readStreams.get(2).getName())
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenReturn(new FakeBigQueryServerStream<>(lastResponses));
 
     BoundedSource<TableRow> source =
@@ -1929,7 +1936,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1994,7 +2001,7 @@
 
     StorageClient fakeStorageClient = mock(StorageClient.class);
     when(fakeStorageClient.readRows(
-            ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+            ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
         .thenReturn(new FakeBigQueryServerStream<>(parentResponses));
 
     // Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -2015,7 +2022,8 @@
                 // This test will read rows 0 and 1 from the parent before calling split,
                 // so we expect the primary read to start at offset 2.
                 .setOffset(2)
-                .build()))
+                .build(),
+            ""))
         .thenThrow(
             new FailedPreconditionException(
                 "Given row offset is invalid for stream.",
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 61b94f6..53d8db0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -27,6 +27,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -48,6 +49,8 @@
 import com.google.api.client.testing.util.MockSleeper;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
@@ -62,6 +65,12 @@
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.RetryBoundedBackOff;
 import java.io.ByteArrayInputStream;
@@ -69,6 +78,7 @@
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -179,15 +189,15 @@
     }
   }
 
-  private void verifyWriteMetricWasSet(
-      String projectId, String dataset, String table, String status, long count) {
+  private void verifyRequestMetricWasSet(
+      String method, String projectId, String dataset, String table, String status, long count) {
     // Verify the metric as reported.
     HashMap<String, String> labels = new HashMap<String, String>();
     // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
     // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
     labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
     labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
-    labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, method);
     labels.put(
         MonitoringInfoConstants.Labels.RESOURCE,
         GcpResourceIdentifiers.bigQueryTable(projectId, dataset, table));
@@ -203,6 +213,16 @@
     assertEquals(count, (long) container.getCounter(name).getCumulative());
   }
 
+  private void verifyWriteMetricWasSet(
+      String projectId, String dataset, String table, String status, long count) {
+    verifyRequestMetricWasSet("BigQueryBatchWrite", projectId, dataset, table, status, count);
+  }
+
+  private void verifyReadMetricWasSet(
+      String projectId, String dataset, String table, String status, long count) {
+    verifyRequestMetricWasSet("BigQueryBatchRead", projectId, dataset, table, status, count);
+  }
+
   /** Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds. */
   @Test
   public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
@@ -1399,4 +1419,145 @@
 
     assertThat(failedInserts, is(expected));
   }
+
+  @Test
+  public void testCreateReadSessionSetsRequestCountMetric()
+      throws InterruptedException, IOException {
+    BigQueryServicesImpl.StorageClientImpl client =
+        mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+    CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder();
+    builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+    CreateReadSessionRequest request = builder.build();
+    when(client.callCreateReadSession(request))
+        .thenReturn(ReadSession.newBuilder().build()); // Mock implementation.
+    when(client.createReadSession(any())).thenCallRealMethod(); // Real implementation.
+
+    client.createReadSession(request);
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+  }
+
+  @Test
+  public void testCreateReadSessionSetsRequestCountMetricOnError()
+      throws InterruptedException, IOException {
+    BigQueryServicesImpl.StorageClientImpl client =
+        mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+    CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder();
+    builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+    CreateReadSessionRequest request = builder.build();
+    StatusCode statusCode =
+        new StatusCode() {
+          @Override
+          public Code getCode() {
+            return Code.NOT_FOUND;
+          }
+
+          @Override
+          public Object getTransportCode() {
+            return null;
+          }
+        };
+    when(client.callCreateReadSession(request))
+        .thenThrow(new ApiException("Not Found", null, statusCode, false)); // Mock implementation.
+    when(client.createReadSession(any())).thenCallRealMethod(); // Real implementation.
+
+    thrown.expect(ApiException.class);
+    thrown.expectMessage("Not Found");
+
+    client.createReadSession(request);
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "not_found", 1);
+  }
+
+  @Test
+  public void testReadRowsSetsRequestCountMetric() throws InterruptedException, IOException {
+    BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+    ReadRowsRequest request = null;
+    BigQueryServices.BigQueryServerStream<ReadRowsResponse> response =
+        new BigQueryServices.BigQueryServerStream<ReadRowsResponse>() {
+          @Override
+          public Iterator<ReadRowsResponse> iterator() {
+            return null;
+          }
+
+          @Override
+          public void cancel() {}
+        };
+
+    when(client.readRows(request)).thenReturn(response); // Mock implementation.
+    when(client.readRows(any(), any())).thenCallRealMethod(); // Real implementation.
+
+    client.readRows(request, "myproject:mydataset.mytable");
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+  }
+
+  @Test
+  public void testReadRowsSetsRequestCountMetricOnError() throws InterruptedException, IOException {
+    BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+    ReadRowsRequest request = null;
+    StatusCode statusCode =
+        new StatusCode() {
+          @Override
+          public Code getCode() {
+            return Code.INTERNAL;
+          }
+
+          @Override
+          public Object getTransportCode() {
+            return null;
+          }
+        };
+    when(client.readRows(request))
+        .thenThrow(new ApiException("Internal", null, statusCode, false)); // Mock implementation.
+    when(client.readRows(any(), any())).thenCallRealMethod(); // Real implementation.
+
+    thrown.expect(ApiException.class);
+    thrown.expectMessage("Internal");
+
+    client.readRows(request, "myproject:mydataset.mytable");
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "internal", 1);
+  }
+
+  @Test
+  public void testSplitReadStreamSetsRequestCountMetric() throws InterruptedException, IOException {
+    BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+    SplitReadStreamRequest request = null;
+    when(client.splitReadStream(request))
+        .thenReturn(SplitReadStreamResponse.newBuilder().build()); // Mock implementation.
+    when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real implementation.
+
+    client.splitReadStream(request, "myproject:mydataset.mytable");
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+  }
+
+  @Test
+  public void testSplitReadStreamSetsRequestCountMetricOnError()
+      throws InterruptedException, IOException {
+    BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+    SplitReadStreamRequest request = null;
+    StatusCode statusCode =
+        new StatusCode() {
+          @Override
+          public Code getCode() {
+            return Code.RESOURCE_EXHAUSTED;
+          }
+
+          @Override
+          public Object getTransportCode() {
+            return null;
+          }
+        };
+    when(client.splitReadStream(request))
+        .thenThrow(
+            new ApiException(
+                "Resource Exhausted", null, statusCode, false)); // Mock implementation.
+    when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real implementation.
+
+    thrown.expect(ApiException.class);
+    thrown.expectMessage("Resource Exhausted");
+
+    client.splitReadStream(request, "myproject:mydataset.mytable");
+    verifyReadMetricWasSet("myproject", "mydataset", "mytable", "resource_exhausted", 1);
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 75c1315..0782622 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -26,9 +26,11 @@
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.math.BigDecimal;
@@ -882,4 +884,82 @@
             record, AVRO_ARRAY_ARRAY_TYPE, BigQueryUtils.ConversionOptions.builder().build());
     assertEquals(expected, beamRow);
   }
+
+  @Test
+  public void testToTableReference() {
+    {
+      TableReference tr =
+          BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable");
+      assertEquals("myproject", tr.getProjectId());
+      assertEquals("mydataset", tr.getDatasetId());
+      assertEquals("mytable", tr.getTableId());
+    }
+
+    {
+      // Test colon(":") after project format
+      TableReference tr = BigQueryUtils.toTableReference("myprojectwithcolon:mydataset.mytable");
+      assertEquals("myprojectwithcolon", tr.getProjectId());
+      assertEquals("mydataset", tr.getDatasetId());
+      assertEquals("mytable", tr.getTableId());
+    }
+
+    {
+      // Test dot(".") after project format
+      TableReference tr = BigQueryUtils.toTableReference("myprojectwithdot.mydataset.mytable");
+      assertEquals("myprojectwithdot", tr.getProjectId());
+      assertEquals("mydataset", tr.getDatasetId());
+      assertEquals("mytable", tr.getTableId());
+    }
+
+    // Invalid scenarios
+    assertNull(BigQueryUtils.toTableReference(""));
+    assertNull(BigQueryUtils.toTableReference(":."));
+    assertNull(BigQueryUtils.toTableReference(".."));
+    assertNull(BigQueryUtils.toTableReference("myproject"));
+    assertNull(BigQueryUtils.toTableReference("myproject:"));
+    assertNull(BigQueryUtils.toTableReference("myproject."));
+    assertNull(BigQueryUtils.toTableReference("myproject:mydataset"));
+    assertNull(BigQueryUtils.toTableReference("myproject:mydataset."));
+    assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable."));
+    assertNull(BigQueryUtils.toTableReference("myproject:mydataset:mytable:"));
+    assertNull(BigQueryUtils.toTableReference(".invalidleadingdot.mydataset.mytable"));
+    assertNull(BigQueryUtils.toTableReference("invalidtrailingdot.mydataset.mytable."));
+    assertNull(BigQueryUtils.toTableReference(":invalidleadingcolon.mydataset.mytable"));
+    assertNull(BigQueryUtils.toTableReference("invalidtrailingcolon.mydataset.mytable:"));
+    assertNull(BigQueryUtils.toTableReference("myproject.mydataset.mytable.myinvalidpart"));
+    assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable.myinvalidpart"));
+
+    assertNull(
+        BigQueryUtils.toTableReference("/projects/extraslash/datasets/mydataset/tables/mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects//extraslash/datasets/mydataset/tables/mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects/extraslash//datasets/mydataset/tables/mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects/extraslash/datasets//mydataset/tables/mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset//tables/mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables//mytable"));
+    assertNull(
+        BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables/mytable/"));
+
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables//"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets//tables/mytable/"));
+    assertNull(BigQueryUtils.toTableReference("projects//datasets/mydataset/tables/mytable/"));
+    assertNull(BigQueryUtils.toTableReference("projects//datasets//tables//"));
+
+    assertNull(
+        BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable/"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject/"));
+    assertNull(BigQueryUtils.toTableReference("projects/myproject"));
+    assertNull(BigQueryUtils.toTableReference("projects/"));
+    assertNull(BigQueryUtils.toTableReference("projects"));
+  }
 }