[BEAM-12769] Fix typo in test class name, CLass -> Class

diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index 10166e1..8f819b6 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -417,6 +417,9 @@
     GCS_PROJECT_ID = 17 [(label_props) = { name: "GCS_PROJECT_ID"}];
     DATASTORE_PROJECT = 18 [(label_props) = { name: "DATASTORE_PROJECT" }];
     DATASTORE_NAMESPACE = 19 [(label_props) = { name: "DATASTORE_NAMESPACE" }];
+    BIGTABLE_PROJECT_ID = 20 [(label_props) = { name: "BIGTABLE_PROJECT_ID"}];
+    INSTANCE_ID = 21 [(label_props) = { name: "INSTANCE_ID"}];
+    TABLE_ID = 22 [(label_props) = { name: "TABLE_ID"}];
   }
 
   // A set of key and value labels which define the scope of the metric. For
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
index 800413c..3133cc1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
@@ -33,6 +33,16 @@
         projectId, datasetId, tableId);
   }
 
+  public static String bigtableTableID(String project, String instance, String table) {
+    return String.format("projects/%s/instances/%s/tables/%s", project, instance, table);
+  }
+
+  public static String bigtableResource(String projectId, String instanceId, String tableId) {
+    return String.format(
+        "//bigtable.googleapis.com/projects/%s/instances/%s/tables/%s",
+        projectId, instanceId, tableId);
+  }
+
   public static String datastoreResource(String projectId, String namespace) {
     return String.format(
         "//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 88e55db..57ee58a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -80,6 +80,9 @@
     public static final String BIGQUERY_QUERY_NAME = "BIGQUERY_QUERY_NAME";
     public static final String DATASTORE_PROJECT = "DATASTORE_PROJECT";
     public static final String DATASTORE_NAMESPACE = "DATASTORE_NAMESPACE";
+    public static final String BIGTABLE_PROJECT_ID = "BIGTABLE_PROJECT_ID";
+    public static final String INSTANCE_ID = "INSTANCE_ID";
+    public static final String TABLE_ID = "TABLE_ID";
 
     static {
       // Note: One benefit of defining these strings above, instead of pulling them in from
@@ -109,6 +112,10 @@
       checkArgument(DATASTORE_PROJECT.equals(extractLabel(MonitoringInfoLabels.DATASTORE_PROJECT)));
       checkArgument(
           DATASTORE_NAMESPACE.equals(extractLabel(MonitoringInfoLabels.DATASTORE_NAMESPACE)));
+      checkArgument(
+          BIGTABLE_PROJECT_ID.equals(extractLabel(MonitoringInfoLabels.BIGTABLE_PROJECT_ID)));
+      checkArgument(INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.INSTANCE_ID)));
+      checkArgument(TABLE_ID.equals(extractLabel(MonitoringInfoLabels.TABLE_ID)));
     }
   }
 
diff --git a/sdks/go/examples/xlang/transforms.go b/sdks/go/examples/xlang/transforms.go
index acad214..3d410e8 100644
--- a/sdks/go/examples/xlang/transforms.go
+++ b/sdks/go/examples/xlang/transforms.go
@@ -20,7 +20,6 @@
 	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -51,7 +50,7 @@
 	pl := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
 	outT := beam.UnnamedOutput(typex.New(reflectx.String))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:prefix", pl, addr, beam.UnnamedInput(col), outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func CoGroupByKey(s beam.Scope, addr string, col1, col2 beam.PCollection) beam.PCollection {
@@ -59,21 +58,21 @@
 	namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
 	outT := beam.UnnamedOutput(typex.NewCoGBK(typex.New(reflectx.Int64), typex.New(reflectx.String)))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:cgbk", nil, addr, namedInputs, outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func CombinePerKey(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
 	s = s.Scope("XLangTest.CombinePerKey")
 	outT := beam.UnnamedOutput(typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64)))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:compk", nil, addr, beam.UnnamedInput(col), outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func CombineGlobally(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
 	s = s.Scope("XLangTest.CombineGlobally")
 	outT := beam.UnnamedOutput(typex.New(reflectx.Int64))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:comgl", nil, addr, beam.UnnamedInput(col), outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func Flatten(s beam.Scope, addr string, col1, col2 beam.PCollection) beam.PCollection {
@@ -81,14 +80,14 @@
 	namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
 	outT := beam.UnnamedOutput(typex.New(reflectx.Int64))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:flatten", nil, addr, namedInputs, outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func GroupByKey(s beam.Scope, addr string, col beam.PCollection) beam.PCollection {
 	s = s.Scope("XLangTest.GroupByKey")
 	outT := beam.UnnamedOutput(typex.NewCoGBK(typex.New(reflectx.String), typex.New(reflectx.Int64)))
 	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:gbk", nil, addr, beam.UnnamedInput(col), outT)
-	return outs[graph.UnnamedOutputTag]
+	return outs[beam.UnnamedOutputTag()]
 }
 
 func Multi(s beam.Scope, addr string, main1, main2, side beam.PCollection) (mainOut, sideOut beam.PCollection) {
@@ -112,5 +111,5 @@
 	s = s.Scope("XLang.Count")
 	outT := beam.UnnamedOutput(typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64)))
 	c := beam.CrossLanguage(s, "beam:transforms:xlang:count", nil, addr, beam.UnnamedInput(col), outT)
-	return c[graph.UnnamedOutputTag]
+	return c[beam.UnnamedOutputTag()]
 }
diff --git a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
index f2317f0..8781cff 100644
--- a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
+++ b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
@@ -48,7 +48,6 @@
 	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -131,7 +130,7 @@
 	pl := beam.CrossLanguagePayload(rpl)
 	outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
 	out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
-	return out[graph.UnnamedOutputTag]
+	return out[beam.UnnamedOutputTag()]
 }
 
 type readOption func(*readConfig)
diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go
index 4ec0162..b9c6ec7 100644
--- a/sdks/go/pkg/beam/xlang.go
+++ b/sdks/go/pkg/beam/xlang.go
@@ -22,32 +22,44 @@
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
 
-// xlang exposes an API to execute cross-language transforms within the Go SDK.
-// It is experimental and likely to change. It exposes convenient wrappers
-// around the core functions to pass in any combination of named/unnamed
-// inputs/outputs.
-
 // UnnamedInput is a helper function for passing single unnamed inputs to
-// `beam.CrossLanguage`.
+// beam.CrossLanguage.
 //
 // Example:
-//    beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs);
+//    beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs)
 func UnnamedInput(col PCollection) map[string]PCollection {
 	return map[string]PCollection{graph.UnnamedInputTag: col}
 }
 
 // UnnamedOutput is a helper function for passing single unnamed output types to
-// `beam.CrossLanguage`.
+// beam.CrossLanguage. The associated output can be accessed with beam.UnnamedOutputTag.
 //
 // Example:
-//    beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
+//    resultMap := beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
+//    result := resultMap[beam.UnnamedOutputTag()]
 func UnnamedOutput(t FullType) map[string]FullType {
 	return map[string]FullType{graph.UnnamedOutputTag: t}
 }
 
-// CrossLanguagePayload encodes a native Go struct into a payload for
-// cross-language transforms. To find the expected structure of a payload,
-// consult the documentation in the SDK performing the expansion.
+// UnnamedOutputTag provides the output tag used for an output passed to beam.UnnamedOutput.
+// Needed to retrieve the unnamed output PCollection from the result of beam.CrossLanguage.
+func UnnamedOutputTag() string {
+	return graph.UnnamedOutputTag
+}
+
+// CrossLanguagePayload encodes a native Go struct into a payload for cross-language transforms.
+// payloads are []byte encoded ExternalConfigurationPayload protobufs. In order to fill the
+// contents of the protobuf, the provided struct will be used to converted to a row encoded
+// representation with an accompanying schema, so the input struct must be compatible with schemas.
+//
+// See https://beam.apache.org/documentation/programming-guide/#schemas for basic information on
+// schemas, and pkg/beam/core/runtime/graphx/schema for details on schemas in the Go SDK.
+//
+// Example:
+//    type stringPayload struct {
+//        Data string
+//    }
+//    encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"})
 func CrossLanguagePayload(pl interface{}) []byte {
 	bytes, err := xlangx.EncodeStructPayload(pl)
 	if err != nil {
@@ -56,8 +68,73 @@
 	return bytes
 }
 
-// CrossLanguage executes a cross-language transform that uses named inputs and
-// returns named outputs.
+// CrossLanguage is a low-level transform for executing cross-language transforms written in other
+// SDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specific
+// wrappers where available. These can be found in the pkg/beam/io/xlang subdirectory.
+// CrossLanguage is useful for executing cross-language transforms which do not have any existing
+// IO wrappers.
+//
+// Usage requires an address for an expansion service accessible during pipeline construction, a
+// URN identifying the desired transform, an optional payload with configuration information, and
+// input and output names. It outputs a map of named output PCollections.
+//
+// For more information on expansion services and other aspects of cross-language transforms in
+// general, refer to the Beam programming guide: https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
+//
+// Payload
+//
+// Payloads are configuration data that some cross-language transforms require for expansion.
+// Consult the documentation of the transform in the source SDK to find out what payload data it
+// requires. If no payload is required, pass in nil.
+//
+// CrossLanguage accepts payloads as a []byte containing an encoded ExternalConfigurationPayload
+// protobuf. The helper function beam.CrossLanguagePayload is the recommended way to easily encode
+// a standard Go struct for use as a payload.
+//
+// Inputs and Outputs
+//
+// Like most transforms, any input PCollections must be provided. Unlike most transforms, output
+// types must be provided because Go cannot infer output types from external transforms.
+//
+// Inputs and outputs to a cross-language transform may be either named or unnamed. Named
+// inputs/outputs are used when there are more than one input/output, and are provided as maps with
+// names as keys. Unnamed inputs/outputs are used when there is only one, and a map can be quickly
+// constructed with the UnnamedInput and UnnamedOutput methods.
+//
+// An example of defining named inputs and outputs:
+//
+//    namedInputs := map[string]beam.PCollection{"pcol1": pcol1, "pcol2": pcol2}
+//    namedOutputTypes := map[string]typex.FullType{
+//        "main": typex.New(reflectx.String),
+//        "side": typex.New(reflectx.Int64),
+//    }
+//
+// CrossLanguage outputs a map of PCollections with associated names. These names will match those
+// from provided named outputs. If the beam.UnnamedOutput method was used, the PCollection can be
+// retrieved with beam.UnnamedOutputTag().
+//
+// An example of retrieving named outputs from a call to CrossLanguage:
+//
+//    outputs := beam.CrossLanguage(...)
+//    mainPcol := outputs["main"]
+//    sidePcol := outputs["side"]
+//
+// Example
+//
+// This example shows using CrossLanguage to execute the Prefix cross-language transform using an
+// expansion service running on localhost:8099. Prefix requires a payload containing a prefix to
+// prepend to every input string.
+//
+//    type prefixPayload struct {
+//        Data string
+//    }
+//    encodedPl := beam.CrossLanguagePayload(prefixPayload{Data: "foo"})
+//    urn := "beam:transforms:xlang:test:prefix"
+//    expansionAddr := "localhost:8099"
+//    outputType := beam.UnnamedOutput(typex.New(reflectx.String))
+//    input := beam.UnnamedInput(inputPcol)
+//    outs := beam.CrossLanguage(s, urn, encodedPl, expansionAddr, input, outputType)
+//    outPcol := outputs[beam.UnnamedOutputTag()]
 func CrossLanguage(
 	s Scope,
 	urn string,
@@ -86,7 +163,9 @@
 	return mapNodeToPCollection(namedOutputs)
 }
 
-// TryCrossLanguage coordinates the core functions required to execute the cross-language transform
+// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
+// This is mainly intended for internal use. For the general-use entry point, see
+// beam.CrossLanguage.
 func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
 	// Adding an edge in the graph corresponding to the ExternalTransform
 	edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index 484fe3b..dfe797e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -86,7 +86,6 @@
   private final SerializableFunction<ElementT, TableRow> toTableRow;
   private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
   private final Set<String> allowedMetricUrns;
-  private @Nullable DatasetService datasetService;
 
   /** Tracks bytes written, exposed as "ByteCount" Counter. */
   private Counter byteCounter = SinkMetrics.bytesWritten();
@@ -222,6 +221,15 @@
     /** The list of unique ids for each BigQuery table row. */
     private transient Map<String, List<String>> uniqueIdsForTableRows;
 
+    private transient @Nullable DatasetService datasetService;
+
+    private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
+      if (datasetService == null) {
+        datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+      }
+      return datasetService;
+    }
+
     /** Prepares a target BigQuery table. */
     @StartBundle
     public void startBundle() {
@@ -257,10 +265,10 @@
           tableRows.entrySet()) {
         TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
         flushRows(
+            getDatasetService(options),
             tableReference,
             entry.getValue(),
             uniqueIdsForTableRows.get(entry.getKey()),
-            options,
             failedInserts,
             successfulInserts);
       }
@@ -272,6 +280,18 @@
       }
       reportStreamingApiLogging(options);
     }
+
+    @Teardown
+    public void onTeardown() {
+      try {
+        if (datasetService != null) {
+          datasetService.close();
+          datasetService = null;
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   // The max duration input records are allowed to be buffered in the state, if using ViaStateful.
@@ -325,13 +345,22 @@
   // shuffling.
   private class InsertBatchedElements
       extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void> {
+    private transient @Nullable DatasetService datasetService;
+
+    private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
+      if (datasetService == null) {
+        datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+      }
+      return datasetService;
+    }
+
     @ProcessElement
     public void processElement(
         @Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> input,
         BoundedWindow window,
         ProcessContext context,
         MultiOutputReceiver out)
-        throws InterruptedException {
+        throws InterruptedException, IOException {
       List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows = new ArrayList<>();
       List<String> uniqueIds = new ArrayList<>();
       for (TableRowInfo<ElementT> row : input.getValue()) {
@@ -347,7 +376,13 @@
       TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey());
       List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
       List<ValueInSingleWindow<TableRow>> successfulInserts = Lists.newArrayList();
-      flushRows(tableReference, tableRows, uniqueIds, options, failedInserts, successfulInserts);
+      flushRows(
+          getDatasetService(options),
+          tableReference,
+          tableRows,
+          uniqueIds,
+          failedInserts,
+          successfulInserts);
 
       for (ValueInSingleWindow<ErrorT> row : failedInserts) {
         out.get(failedOutputTag).output(row.getValue());
@@ -357,44 +392,43 @@
       }
       reportStreamingApiLogging(options);
     }
-  }
 
-  @Teardown
-  public void onTeardown() {
-    try {
-      if (datasetService != null) {
-        datasetService.close();
-        datasetService = null;
+    @Teardown
+    public void onTeardown() {
+      try {
+        if (datasetService != null) {
+          datasetService.close();
+          datasetService = null;
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
     }
   }
 
   /** Writes the accumulated rows into BigQuery with streaming API. */
   private void flushRows(
+      DatasetService datasetService,
       TableReference tableReference,
       List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows,
       List<String> uniqueIds,
-      BigQueryOptions options,
       List<ValueInSingleWindow<ErrorT>> failedInserts,
       List<ValueInSingleWindow<TableRow>> successfulInserts)
       throws InterruptedException {
     if (!tableRows.isEmpty()) {
       try {
         long totalBytes =
-            getDatasetService(options)
-                .insertAll(
-                    tableReference,
-                    tableRows,
-                    uniqueIds,
-                    retryPolicy,
-                    failedInserts,
-                    errorContainer,
-                    skipInvalidRows,
-                    ignoreUnknownValues,
-                    ignoreInsertIds,
-                    successfulInserts);
+            datasetService.insertAll(
+                tableReference,
+                tableRows,
+                uniqueIds,
+                retryPolicy,
+                failedInserts,
+                errorContainer,
+                skipInvalidRows,
+                ignoreUnknownValues,
+                ignoreInsertIds,
+                successfulInserts);
         byteCounter.inc(totalBytes);
       } catch (IOException e) {
         throw new RuntimeException(e);
@@ -402,13 +436,6 @@
     }
   }
 
-  private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
-    if (datasetService == null) {
-      datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
-    }
-    return datasetService;
-  }
-
   private void reportStreamingApiLogging(BigQueryOptions options) {
     MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer();
     if (processWideContainer instanceof MetricsLogger) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1d3d894..8b9b705 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -965,49 +965,53 @@
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
       if (getValidate()) {
-        if (table != null) {
-          checkArgument(table.isAccessible(), "Cannot call validate if table is dynamically set.");
-        }
-        if (table != null && table.get().getProjectId() != null) {
-          // Check for source table presence for early failure notification.
-          DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
-          BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
-          BigQueryHelpers.verifyTablePresence(datasetService, table.get());
-        } else if (getQuery() != null) {
-          checkArgument(
-              getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
-          JobService jobService = getBigQueryServices().getJobService(bqOptions);
-          try {
-            jobService.dryRunQuery(
-                bqOptions.getBigQueryProject() == null
-                    ? bqOptions.getProject()
-                    : bqOptions.getBigQueryProject(),
-                new JobConfigurationQuery()
-                    .setQuery(getQuery().get())
-                    .setFlattenResults(getFlattenResults())
-                    .setUseLegacySql(getUseLegacySql()),
-                getQueryLocation());
-          } catch (Exception e) {
-            throw new IllegalArgumentException(
-                String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+        try (DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions)) {
+          if (table != null) {
+            checkArgument(
+                table.isAccessible(), "Cannot call validate if table is dynamically set.");
           }
+          if (table != null && table.get().getProjectId() != null) {
+            // Check for source table presence for early failure notification.
+            BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+            BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+          } else if (getQuery() != null) {
+            checkArgument(
+                getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
+            JobService jobService = getBigQueryServices().getJobService(bqOptions);
+            try {
+              jobService.dryRunQuery(
+                  bqOptions.getBigQueryProject() == null
+                      ? bqOptions.getProject()
+                      : bqOptions.getBigQueryProject(),
+                  new JobConfigurationQuery()
+                      .setQuery(getQuery().get())
+                      .setFlattenResults(getFlattenResults())
+                      .setUseLegacySql(getUseLegacySql()),
+                  getQueryLocation());
+            } catch (Exception e) {
+              throw new IllegalArgumentException(
+                  String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+            }
 
-          DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
-          // If the user provided a temp dataset, check if the dataset exists before launching the
-          // query
-          if (getQueryTempDataset() != null) {
-            // The temp table is only used for dataset and project id validation, not for table name
-            // validation
-            TableReference tempTable =
-                new TableReference()
-                    .setProjectId(
-                        bqOptions.getBigQueryProject() == null
-                            ? bqOptions.getProject()
-                            : bqOptions.getBigQueryProject())
-                    .setDatasetId(getQueryTempDataset())
-                    .setTableId("dummy table");
-            BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            // If the user provided a temp dataset, check if the dataset exists before launching the
+            // query
+            if (getQueryTempDataset() != null) {
+              // The temp table is only used for dataset and project id validation, not for table
+              // name
+              // validation
+              TableReference tempTable =
+                  new TableReference()
+                      .setProjectId(
+                          bqOptions.getBigQueryProject() == null
+                              ? bqOptions.getProject()
+                              : bqOptions.getBigQueryProject())
+                      .setDatasetId(getQueryTempDataset())
+                      .setTableId("dummy table");
+              BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            }
           }
+        } catch (Exception e) {
+          throw new RuntimeException(e);
         }
       }
     }
@@ -1401,15 +1405,17 @@
                           options.getJobName(), jobUuid, JobType.QUERY),
                       queryTempDataset);
 
-              DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-              LOG.info("Deleting temporary table with query results {}", tempTable);
-              datasetService.deleteTable(tempTable);
-              // Delete dataset only if it was created by Beam
-              boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
-              if (datasetCreatedByBeam) {
-                LOG.info(
-                    "Deleting temporary dataset with query results {}", tempTable.getDatasetId());
-                datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
+              try (DatasetService datasetService =
+                  getBigQueryServices().getDatasetService(options)) {
+                LOG.info("Deleting temporary table with query results {}", tempTable);
+                datasetService.deleteTable(tempTable);
+                // Delete dataset only if it was created by Beam
+                boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
+                if (datasetCreatedByBeam) {
+                  LOG.info(
+                      "Deleting temporary dataset with query results {}", tempTable.getDatasetId());
+                  datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
+                }
               }
             }
           };
@@ -2484,17 +2490,20 @@
       // The user specified a table.
       if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
-        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-        // Check for destination table presence and emptiness for early failure notification.
-        // Note that a presence check can fail when the table or dataset is created by an earlier
-        // stage of the pipeline. For these cases the #withoutValidation method can be used to
-        // disable the check.
-        BigQueryHelpers.verifyDatasetPresence(datasetService, table);
-        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-          BigQueryHelpers.verifyTablePresence(datasetService, table);
-        }
-        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+        try (DatasetService datasetService = getBigQueryServices().getDatasetService(options)) {
+          // Check for destination table presence and emptiness for early failure notification.
+          // Note that a presence check can fail when the table or dataset is created by an earlier
+          // stage of the pipeline. For these cases the #withoutValidation method can be used to
+          // disable the check.
+          BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+          if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+            BigQueryHelpers.verifyTablePresence(datasetService, table);
+          }
+          if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+            BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+          }
+        } catch (Exception e) {
+          throw new RuntimeException(e);
         }
       }
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index aeee3d2..eb9f8c4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -38,10 +38,14 @@
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.values.KV;
@@ -130,12 +134,40 @@
       String tableNameSr =
           session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
 
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      ServiceCallMetric serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
       if (source.getRowFilter() != null) {
         requestB.setFilter(source.getRowFilter());
       }
-      results = session.getDataClient().readRows(requestB.build());
+      try {
+        results = session.getDataClient().readRows(requestB.build());
+        serviceCallMetric.call("ok");
+      } catch (StatusRuntimeException e) {
+        serviceCallMetric.call(e.getStatus().getCode().value());
+        throw e;
+      }
       return advance();
     }
 
@@ -182,10 +214,12 @@
   static class BigtableWriterImpl implements Writer {
     private BigtableSession session;
     private BulkMutation bulkMutation;
+    private BigtableTableName tableName;
 
     BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {
       this.session = session;
       bulkMutation = session.createBulkMutation(tableName);
+      this.tableName = tableName;
     }
 
     @Override
@@ -231,6 +265,28 @@
               .addAllMutations(record.getValue())
               .build();
 
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.MutateRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      ServiceCallMetric serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
       CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
       Futures.addCallback(
           new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
@@ -238,10 +294,17 @@
             @Override
             public void onSuccess(MutateRowResponse mutateRowResponse) {
               result.complete(mutateRowResponse);
+              serviceCallMetric.call("ok");
             }
 
             @Override
             public void onFailure(Throwable throwable) {
+              if (throwable instanceof StatusRuntimeException) {
+                serviceCallMetric.call(
+                    ((StatusRuntimeException) throwable).getStatus().getCode().value());
+              } else {
+                serviceCallMetric.call("unknown");
+              }
               result.completeExceptionally(throwable);
             }
           },
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 69be079..b983cc3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
@@ -42,9 +43,15 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -61,8 +68,12 @@
 @RunWith(JUnit4.class)
 public class BigtableServiceImplTest {
 
+  private static final String PROJECT_ID = "project";
+  private static final String INSTANCE_ID = "instance";
+  private static final String TABLE_ID = "table";
+
   private static final BigtableTableName TABLE_NAME =
-      new BigtableInstanceName("project", "instance").toTableName("table");
+      new BigtableInstanceName(PROJECT_ID, INSTANCE_ID).toTableName(TABLE_ID);
 
   @Mock private BigtableSession mockSession;
 
@@ -76,10 +87,13 @@
   public void setup() {
     MockitoAnnotations.initMocks(this);
     BigtableOptions options =
-        new BigtableOptions.Builder().setProjectId("project").setInstanceId("instance").build();
+        new BigtableOptions.Builder().setProjectId(PROJECT_ID).setInstanceId(INSTANCE_ID).build();
     when(mockSession.getOptions()).thenReturn(options);
     when(mockSession.createBulkMutation(eq(TABLE_NAME))).thenReturn(mockBulkMutation);
     when(mockSession.getDataClient()).thenReturn(mockBigtableDataClient);
+    // Setup the ProcessWideContainer for testing metrics are set.
+    MetricsContainerImpl container = new MetricsContainerImpl(null);
+    MetricsEnvironment.setProcessWideContainer(container);
   }
 
   /**
@@ -94,7 +108,7 @@
     ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
     ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
     when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
-    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of("table_name"));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
     @SuppressWarnings("unchecked")
     ResultScanner<Row> mockResultScanner = Mockito.mock(ResultScanner.class);
     Row expectedRow = Row.newBuilder().setKey(ByteString.copyFromUtf8("a")).build();
@@ -109,6 +123,7 @@
     underTest.close();
 
     verify(mockResultScanner, times(1)).close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
   /**
@@ -140,4 +155,27 @@
     underTest.close();
     verify(mockBulkMutation, times(1)).flush();
   }
+
+  private void verifyMetricWasSet(String method, String status, long count) {
+    // Verify the metric as reported.
+    HashMap<String, String> labels = new HashMap<>();
+    labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+    labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, method);
+    labels.put(
+        MonitoringInfoConstants.Labels.RESOURCE,
+        GcpResourceIdentifiers.bigtableResource(PROJECT_ID, INSTANCE_ID, TABLE_ID));
+    labels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, PROJECT_ID);
+    labels.put(MonitoringInfoConstants.Labels.INSTANCE_ID, INSTANCE_ID);
+    labels.put(
+        MonitoringInfoConstants.Labels.TABLE_ID,
+        GcpResourceIdentifiers.bigtableTableID(PROJECT_ID, INSTANCE_ID, TABLE_ID));
+    labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+    MonitoringInfoMetricName name =
+        MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels);
+    MetricsContainerImpl container =
+        (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+    assertEquals(count, (long) container.getCounter(name).getCumulative());
+  }
 }
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 886e56e..9ed0b25 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -208,8 +208,6 @@
 
   def load_pcoder(self, *labels):
     saved_pcoder = self._saved_pcoders.get(self._path(*labels), None)
-    # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
-    # WindowedValueHolder.
     if saved_pcoder is None or isinstance(saved_pcoder,
                                           coders.FastPrimitivesCoder):
       return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 054c9a6..fc8a8aa 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -390,8 +390,6 @@
   def load_pcoder(self, *labels):
     saved_pcoder = self._saved_pcoders.get(
         os.path.join(self._cache_dir, *labels), None)
-    # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
-    # WindowedValueHolder.
     if saved_pcoder is None or isinstance(saved_pcoder,
                                           coders.FastPrimitivesCoder):
       return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index cee3d34..1dc42e0 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -227,7 +227,11 @@
       endpoint=test_stream_service.endpoint)
   sql_source = {}
   for tag, output in output_pcolls.items():
-    sql_source[tag_to_name[tag]] = output
+    name = tag_to_name[tag]
+    # Must mark the element_type to avoid introducing pickled Python coder
+    # to the Java expansion service.
+    output.element_type = name_to_pcoll[name].element_type
+    sql_source[name] = output
   return sql_source