Merge pull request #14572: [BEAM-12118] Fix racy precondition exception in QueueingBeamFnDataClient

diff --git a/CHANGES.md b/CHANGES.md
index a94662f..489a9c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,8 @@
 * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Python Row objects are now sensitive to field order. So `Row(x=3, y=4)` is no
   longer considered equal to `Row(y=4, x=3)` (BEAM-11929).
+* Kafka Beam SQL tables now ascribe meaning to the LOCATION field; previously
+  it was ignored if provided.
 
 ## Deprecations
 
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 0947ce7..1fa4f5e 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -520,7 +520,7 @@
         google_auth_library_credentials             : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
         google_auth_library_oauth2_http             : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
         google_cloud_bigquery                       : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
-        google_cloud_bigquery_storage               : "com.google.cloud:google-cloud-bigquerystorage:1.12.0",
+        google_cloud_bigquery_storage               : "com.google.cloud:google-cloud-bigquerystorage:1.18.1",
         google_cloud_bigtable_client_core           : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
         google_cloud_bigtable_emulator              : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
         google_cloud_core                           : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
index f944572..9f32382 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
@@ -55,13 +55,39 @@
     value.clear();
   }
 
-  /** Increment the distribution by the given amount. */
+  /** Increment the corresponding histogram bucket count for the value by 1. */
   @Override
   public void update(double value) {
     this.value.record(value);
     dirty.afterModification();
   }
 
+  /**
+   * Increment all of the bucket counts in this histogram, by the bucket counts specified in other.
+   */
+  public void update(HistogramCell other) {
+    this.value.update(other.value);
+    dirty.afterModification();
+  }
+
+  // TODO(BEAM-12103): Update this function to allow incrementing the infinite buckets as well.
+  // and remove the incTopBucketCount and incBotBucketCount methods.
+  // Using 0 and length -1 as the bucketIndex.
+  public void incBucketCount(int bucketIndex, long count) {
+    this.value.incBucketCount(bucketIndex, count);
+    dirty.afterModification();
+  }
+
+  public void incTopBucketCount(long count) {
+    this.value.incTopBucketCount(count);
+    dirty.afterModification();
+  }
+
+  public void incBottomBucketCount(long count) {
+    this.value.incBottomBucketCount(count);
+    dirty.afterModification();
+  }
+
   @Override
   public DirtyState getDirty() {
     return dirty;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 40ea1b5..94ddc99 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -43,10 +43,10 @@
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsLogger;
 import org.apache.beam.sdk.util.HistogramData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -69,10 +69,10 @@
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class MetricsContainerImpl implements Serializable, MetricsContainer, MetricsLogger {
+public class MetricsContainerImpl implements Serializable, MetricsContainer {
   private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class);
 
-  private final @Nullable String stepName;
+  protected final @Nullable String stepName;
 
   private MetricsMap<MetricName, CounterCell> counters = new MetricsMap<>(CounterCell::new);
 
@@ -95,7 +95,6 @@
   private Map<MetricKey, Optional<String>> shortIdsByMetricKey = new ConcurrentHashMap<>();
 
   /** Reset the metrics. */
-  @Override
   public void reset() {
     reset(counters);
     reset(distributions);
@@ -376,6 +375,7 @@
     updateCounters(counters, other.counters);
     updateDistributions(distributions, other.distributions);
     updateGauges(gauges, other.gauges);
+    updateHistograms(histograms, other.histograms);
   }
 
   private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
@@ -446,6 +446,16 @@
     }
   }
 
+  private void updateHistograms(
+      MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> current,
+      MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> updates) {
+    for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> histogram :
+        updates.entries()) {
+      HistogramCell h = histogram.getValue();
+      current.get(histogram.getKey()).update(h);
+    }
+  }
+
   @Override
   public boolean equals(@Nullable Object object) {
     if (object instanceof MetricsContainerImpl) {
@@ -455,7 +465,6 @@
           && Objects.equals(distributions, metricsContainerImpl.distributions)
           && Objects.equals(gauges, metricsContainerImpl.gauges);
     }
-
     return false;
   }
 
@@ -466,21 +475,25 @@
 
   /**
    * Match a MetricName with a given metric filter. If the metric filter is null, the method always
-   * returns true.
+   * returns true. TODO(BEAM-10986) Consider making this use the MetricNameFilter and related
+   * classes.
    */
-  private boolean matchMetricName(MetricName metricName, @Nullable Set<MetricName> metricFilter) {
-    if (metricFilter == null) {
+  @VisibleForTesting
+  static boolean matchMetric(MetricName metricName, @Nullable Set<String> allowedMetricUrns) {
+    if (allowedMetricUrns == null) {
       return true;
-    } else {
-      return metricFilter.contains(metricName);
     }
+    if (metricName instanceof MonitoringInfoMetricName) {
+      return allowedMetricUrns.contains(((MonitoringInfoMetricName) metricName).getUrn());
+    }
+    return false;
   }
+
   /** Return a string representing the cumulative values of all metrics in this container. */
-  @Override
-  public String getCumulativeString(@Nullable Set<MetricName> metricFilter) {
+  public String getCumulativeString(@Nullable Set<String> allowedMetricUrns) {
     StringBuilder message = new StringBuilder();
     for (Map.Entry<MetricName, CounterCell> cell : counters.entries()) {
-      if (!matchMetricName(cell.getKey(), metricFilter)) {
+      if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
         continue;
       }
       message.append(cell.getKey().toString());
@@ -489,7 +502,7 @@
       message.append("\n");
     }
     for (Map.Entry<MetricName, DistributionCell> cell : distributions.entries()) {
-      if (!matchMetricName(cell.getKey(), metricFilter)) {
+      if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
         continue;
       }
       message.append(cell.getKey().toString());
@@ -502,7 +515,7 @@
       message.append("\n");
     }
     for (Map.Entry<MetricName, GaugeCell> cell : gauges.entries()) {
-      if (!matchMetricName(cell.getKey(), metricFilter)) {
+      if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
         continue;
       }
       message.append(cell.getKey().toString());
@@ -513,7 +526,7 @@
     }
     for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
         histograms.entries()) {
-      if (!matchMetricName(cell.getKey().getKey(), metricFilter)) {
+      if (!matchMetric(cell.getKey().getKey(), allowedMetricUrns)) {
         continue;
       }
       message.append(cell.getKey().getKey().toString());
@@ -532,8 +545,43 @@
     return message.toString();
   }
 
-  @Override
-  public Logger getMetricLogger() {
-    return LOG;
+  /**
+   * Returns a MetricContainer with the delta values between two MetricsContainers. The purpose of
+   * this function is to print the changes made to the metrics within a window of time. The
+   * difference between the counter and histogram bucket counters are calculated between curr and
+   * prev. The most recent value are used for gauges. Distribution metrics are dropped (As there is
+   * meaningful way to calculate the delta). Returns curr if prev is null.
+   */
+  public static MetricsContainerImpl deltaContainer(
+      @Nullable MetricsContainerImpl prev, MetricsContainerImpl curr) {
+    if (prev == null) {
+      return curr;
+    }
+    MetricsContainerImpl deltaContainer = new MetricsContainerImpl(curr.stepName);
+    for (Map.Entry<MetricName, CounterCell> cell : curr.counters.entries()) {
+      Long prevValue = prev.counters.get(cell.getKey()).getCumulative();
+      Long currValue = cell.getValue().getCumulative();
+      deltaContainer.counters.get(cell.getKey()).inc(currValue - prevValue);
+    }
+    for (Map.Entry<MetricName, GaugeCell> cell : curr.gauges.entries()) {
+      // Simply take the most recent value for gauge, no need to count deltas.
+      deltaContainer.gauges.get(cell.getKey()).update(cell.getValue().getCumulative());
+    }
+    for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
+        curr.histograms.entries()) {
+      HistogramData.BucketType bt = cell.getKey().getValue();
+      HistogramData prevValue = prev.histograms.get(cell.getKey()).getCumulative();
+      HistogramData currValue = cell.getValue().getCumulative();
+      HistogramCell deltaValueCell = deltaContainer.histograms.get(cell.getKey());
+      deltaValueCell.incBottomBucketCount(
+          currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
+      for (int i = 0; i < bt.getNumBuckets(); i++) {
+        Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i);
+        deltaValueCell.incBucketCount(i, bucketCountDelta);
+      }
+      deltaValueCell.incTopBucketCount(
+          currValue.getTopBucketCount() - prevValue.getTopBucketCount());
+    }
+    return deltaContainer;
   }
 }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java
new file mode 100644
index 0000000..c367382
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Experimental(Kind.METRICS)
+public class MetricsLogger extends MetricsContainerImpl {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsLogger.class);
+
+  Lock reportingLocK = new ReentrantLock();
+  AtomicLong lastReportedMillis = new AtomicLong(System.currentTimeMillis());
+  @Nullable MetricsContainerImpl lastMetricsSnapshot = null;
+
+  public MetricsLogger(@Nullable String stepName) {
+    super(stepName);
+  }
+
+  public String generateLogMessage(
+      String header, Set<String> allowedMetricUrns, long lastReported) {
+    MetricsContainerImpl nextMetricsSnapshot = new MetricsContainerImpl(this.stepName);
+    nextMetricsSnapshot.update(this);
+    MetricsContainerImpl deltaContainer =
+        MetricsContainerImpl.deltaContainer(lastMetricsSnapshot, nextMetricsSnapshot);
+
+    StringBuilder logMessage = new StringBuilder();
+    logMessage.append(header);
+    logMessage.append(deltaContainer.getCumulativeString(allowedMetricUrns));
+    logMessage.append(String.format("(last reported at %s)%n", new Date(lastReported)));
+
+    lastMetricsSnapshot = nextMetricsSnapshot;
+    return logMessage.toString();
+  }
+
+  public void tryLoggingMetrics(
+      String header, Set<String> allowedMetricUrns, long minimumLoggingFrequencyMillis) {
+
+    if (reportingLocK.tryLock()) {
+      try {
+        long currentTimeMillis = System.currentTimeMillis();
+        long lastReported = lastReportedMillis.get();
+        if (currentTimeMillis - lastReported > minimumLoggingFrequencyMillis) {
+          LOG.info(generateLogMessage(header, allowedMetricUrns, lastReported));
+          lastReportedMillis.set(currentTimeMillis);
+        }
+      } finally {
+        reportingLocK.unlock();
+      }
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean equals(@Nullable Object object) {
+    if (object instanceof MetricsLogger) {
+      return super.equals(object);
+    }
+    return false;
+  }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index fa9ebc4..5161bb4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -71,9 +71,20 @@
     public static final String NAME = "NAME";
     public static final String SERVICE = "SERVICE";
     public static final String METHOD = "METHOD";
+    public static final String RESOURCE = "RESOURCE";
     public static final String STATUS = "STATUS";
+    public static final String BIGQUERY_PROJECT_ID = "BIGQUERY_PROJECT_ID";
+    public static final String BIGQUERY_DATASET = "BIGQUERY_DATASET";
+    public static final String BIGQUERY_TABLE = "BIGQUERY_TABLE";
+    public static final String BIGQUERY_VIEW = "BIGQUERY_VIEW";
+    public static final String BIGQUERY_QUERY_NAME = "BIGQUERY_QUERY_NAME";
 
     static {
+      // Note: One benefit of defining these strings above, instead of pulling them in from
+      // the proto files, is to ensure that this code will crash if the strings in the proto
+      // file are changed, without modifying this file.
+      // Though, one should not change those strings either, as Runner Harnesss running old versions
+      // would not be able to understand the new label names./
       checkArgument(PTRANSFORM.equals(extractLabel(MonitoringInfoLabels.TRANSFORM)));
       checkArgument(PCOLLECTION.equals(extractLabel(MonitoringInfoLabels.PCOLLECTION)));
       checkArgument(
@@ -84,7 +95,15 @@
       checkArgument(NAME.equals(extractLabel(MonitoringInfoLabels.NAME)));
       checkArgument(SERVICE.equals(extractLabel(MonitoringInfoLabels.SERVICE)));
       checkArgument(METHOD.equals(extractLabel(MonitoringInfoLabels.METHOD)));
+      checkArgument(RESOURCE.equals(extractLabel(MonitoringInfoLabels.RESOURCE)));
       checkArgument(STATUS.equals(extractLabel(MonitoringInfoLabels.STATUS)));
+      checkArgument(
+          BIGQUERY_PROJECT_ID.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_PROJECT_ID)));
+      checkArgument(BIGQUERY_DATASET.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_DATASET)));
+      checkArgument(BIGQUERY_TABLE.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_TABLE)));
+      checkArgument(BIGQUERY_VIEW.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_VIEW)));
+      checkArgument(
+          BIGQUERY_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.BIGQUERY_QUERY_NAME)));
     }
   }
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index f04fcea..837432f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -23,11 +23,18 @@
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.util.HistogramData;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -269,6 +276,61 @@
   }
 
   @Test
+  public void testDeltaCounters() {
+    MetricName cName = MetricName.named("namespace", "counter");
+    MetricName gName = MetricName.named("namespace", "gauge");
+    HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
+    MetricName hName = MetricName.named("namespace", "histogram");
+
+    MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
+    prevContainer.getCounter(cName).inc(2L);
+    prevContainer.getGauge(gName).set(4L);
+    // Set buckets counts to: [1,1,1,0,0,0,1]
+    prevContainer.getHistogram(hName, bucketType).update(-1);
+    prevContainer.getHistogram(hName, bucketType).update(1);
+    prevContainer.getHistogram(hName, bucketType).update(3);
+    prevContainer.getHistogram(hName, bucketType).update(20);
+
+    MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
+    nextContainer.getCounter(cName).inc(9L);
+    nextContainer.getGauge(gName).set(8L);
+    // Set buckets counts to: [2,4,5,0,0,0,3]
+    nextContainer.getHistogram(hName, bucketType).update(-1);
+    nextContainer.getHistogram(hName, bucketType).update(-1);
+    for (int i = 0; i < 4; i++) {
+      nextContainer.getHistogram(hName, bucketType).update(1);
+    }
+    for (int i = 0; i < 5; i++) {
+      nextContainer.getHistogram(hName, bucketType).update(3);
+    }
+    nextContainer.getHistogram(hName, bucketType).update(20);
+    nextContainer.getHistogram(hName, bucketType).update(20);
+    nextContainer.getHistogram(hName, bucketType).update(20);
+
+    MetricsContainerImpl deltaContainer =
+        MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
+    // Expect counter value: 7 = 9 - 2
+    long cValue = deltaContainer.getCounter(cName).getCumulative();
+    assertEquals(7L, cValue);
+
+    // Expect gauge value: 8.
+    GaugeData gValue = deltaContainer.getGauge(gName).getCumulative();
+    assertEquals(8L, gValue.value());
+
+    // Expect bucket counts: [1,3,4,0,0,0,2]
+    assertEquals(
+        1, deltaContainer.getHistogram(hName, bucketType).getCumulative().getBottomBucketCount());
+    long[] expectedBucketCounts = (new long[] {3, 4, 0, 0, 0});
+    for (int i = 0; i < expectedBucketCounts.length; i++) {
+      assertEquals(
+          expectedBucketCounts[i],
+          deltaContainer.getHistogram(hName, bucketType).getCumulative().getCount(i));
+    }
+    assertEquals(
+        2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());
+  }
+
+  @Test
   public void testNotEquals() {
     MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("stepName");
 
@@ -293,4 +355,37 @@
     Assert.assertNotEquals(metricsContainerImpl, differentGauges);
     Assert.assertNotEquals(metricsContainerImpl.hashCode(), differentGauges.hashCode());
   }
+
+  @Test
+  public void testMatchMetric() {
+    String urn = MonitoringInfoConstants.Urns.API_REQUEST_COUNT;
+    Map<String, String> labels = new HashMap<String, String>();
+    labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "MyPtransform");
+    labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+    labels.put(MonitoringInfoConstants.Labels.RESOURCE, "Resource");
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, "MyProject");
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, "MyDataset");
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, "MyTable");
+
+    // MonitoringInfoMetricName will copy labels. So its safe to reuse this reference.
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "ok");
+    MonitoringInfoMetricName okName = MonitoringInfoMetricName.named(urn, labels);
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "not_found");
+    MonitoringInfoMetricName notFoundName = MonitoringInfoMetricName.named(urn, labels);
+
+    Set<String> allowedMetricUrns = new HashSet<String>();
+    allowedMetricUrns.add(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
+    assertTrue(MetricsContainerImpl.matchMetric(okName, allowedMetricUrns));
+    assertTrue(MetricsContainerImpl.matchMetric(notFoundName, allowedMetricUrns));
+
+    MetricName userMetricName = MetricName.named("namespace", "name");
+    assertFalse(MetricsContainerImpl.matchMetric(userMetricName, allowedMetricUrns));
+
+    MetricName elementCountName =
+        MonitoringInfoMetricName.named(
+            MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+            Collections.singletonMap("name", "counter"));
+    assertFalse(MetricsContainerImpl.matchMetric(elementCountName, allowedMetricUrns));
+  }
 }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java
new file mode 100644
index 0000000..a37a0c7
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsLoggerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.util.HistogramData;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+public class MetricsLoggerTest {
+
+  @Test
+  public void testGeneratedLogMessageShowsDeltas() {
+    MetricName cName =
+        MonitoringInfoMetricName.named(
+            MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+            Collections.singletonMap("name", "counter"));
+    HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
+    MetricName hName =
+        MonitoringInfoMetricName.named(
+            MonitoringInfoConstants.Urns.ELEMENT_COUNT,
+            Collections.singletonMap("name", "histogram"));
+
+    MetricsLogger logger = new MetricsLogger(null);
+    logger.getCounter(cName).inc(2L);
+    // Set buckets counts to: [0,1,1,,0,0,...]
+    logger.getHistogram(hName, bucketType).update(1);
+    logger.getHistogram(hName, bucketType).update(3);
+
+    Set<String> allowedMetricUrns = new HashSet<String>();
+    allowedMetricUrns.add(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
+    String msg = logger.generateLogMessage("My Headder", allowedMetricUrns, 0);
+    assertThat(msg, CoreMatchers.containsString("beam:metric:element_count:v1 {name=counter} = 2"));
+    assertThat(
+        msg,
+        CoreMatchers.containsString(
+            "{name=histogram} = {count: 2, p50: 2.000000, p90: 3.600000, p99: 3.960000}"));
+
+    logger.getCounter(cName).inc(3L);
+    // Set buckets counts to: [0,5,6,0,0,0]
+    // Which means a delta of: [0,4,5,0,0,0]
+    for (int i = 0; i < 4; i++) {
+      logger.getHistogram(hName, bucketType).update(1);
+    }
+    for (int i = 0; i < 5; i++) {
+      logger.getHistogram(hName, bucketType).update(3);
+    }
+    msg = logger.generateLogMessage("My Header: ", allowedMetricUrns, 0);
+    assertThat(msg, CoreMatchers.containsString("beam:metric:element_count:v1 {name=counter} = 3"));
+    assertThat(
+        msg,
+        CoreMatchers.containsString(
+            "{name=histogram} = {count: 9, p50: 2.200000, p90: 3.640000, p99: 3.964000}"));
+
+    logger.getCounter(cName).inc(4L);
+    // Set buckets counts to: [0,8,10,0,0,0]
+    // Which means a delta of: [0,3,4,0,0,0]
+    for (int i = 0; i < 3; i++) {
+      logger.getHistogram(hName, bucketType).update(1);
+    }
+    for (int i = 0; i < 4; i++) {
+      logger.getHistogram(hName, bucketType).update(3);
+    }
+    msg = logger.generateLogMessage("My Header: ", allowedMetricUrns, 0);
+    assertThat(
+        msg,
+        CoreMatchers.containsString(
+            "{name=histogram} = {count: 7, p50: 2.250000, p90: 3.650000, p99: 3.965000}"));
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 5a58612..bb5eccd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -68,7 +68,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -292,7 +292,9 @@
     StreamingDataflowWorker worker =
         StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options, sdkHarnessRegistry);
 
-    MetricsEnvironment.setProcessWideContainer(new MetricsContainerImpl(null));
+    // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide
+    // metrics.
+    MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
 
     JvmInitializers.runBeforeProcessing(options);
     worker.startStatusPages();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java
deleted file mode 100644
index f5af592..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLogger.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.slf4j.Logger;
-
-@Experimental(Kind.METRICS)
-public interface MetricsLogger extends Serializable {
-  Lock REPORTING_LOCK = new ReentrantLock();
-  AtomicLong LAST_REPORTED_MILLIS = new AtomicLong(System.currentTimeMillis());
-
-  default void tryLoggingMetrics(
-      String header,
-      Set<MetricName> metricFilter,
-      long minimumLoggingFrequencyMillis,
-      boolean resetMetrics) {
-    if (REPORTING_LOCK.tryLock()) {
-      try {
-        long currentTimeMillis = System.currentTimeMillis();
-        long lastReported = LAST_REPORTED_MILLIS.get();
-        if (currentTimeMillis - lastReported > minimumLoggingFrequencyMillis) {
-          StringBuilder logMessage = new StringBuilder();
-          logMessage.append(header);
-          logMessage.append(getCumulativeString(metricFilter));
-          if (resetMetrics) {
-            reset();
-            logMessage.append(String.format("(last reported at %s)%n", new Date(lastReported)));
-          }
-          getMetricLogger().info(logMessage.toString());
-          LAST_REPORTED_MILLIS.set(currentTimeMillis);
-        }
-      } finally {
-        REPORTING_LOCK.unlock();
-      }
-    }
-  }
-
-  Logger getMetricLogger();
-
-  String getCumulativeString(Set<MetricName> metricFilter);
-
-  void reset();
-}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
index 5bade78..28d4a38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
@@ -40,8 +40,10 @@
 
   private final BucketType bucketType;
 
+  // TODO(BEAM-12103): Update this function to remove the numTopRecords and numBottomRecords
+  // and include those counters in the buckets array.
   private long[] buckets;
-  private long numOfRecords;
+  private long numBoundedBucketRecords;
   private long numTopRecords;
   private long numBottomRecords;
 
@@ -53,13 +55,18 @@
   public HistogramData(BucketType bucketType) {
     this.bucketType = bucketType;
     this.buckets = new long[bucketType.getNumBuckets()];
-    this.numOfRecords = 0;
+    this.numBoundedBucketRecords = 0;
     this.numTopRecords = 0;
     this.numBottomRecords = 0;
   }
 
+  public BucketType getBucketType() {
+    return this.bucketType;
+  }
+
   /**
-   * Create a histogram with linear buckets.
+   * TODO(BEAM-12103): Update this function to define numBuckets total, including the infinite
+   * buckets. Create a histogram with linear buckets.
    *
    * @param start Lower bound of a starting bucket.
    * @param width Bucket width. Smaller width implies a better resolution for percentile estimation.
@@ -77,9 +84,41 @@
     }
   }
 
+  public synchronized void update(HistogramData other) {
+    synchronized (other) {
+      if (!this.bucketType.equals(other.bucketType)
+          || this.buckets.length != other.buckets.length) {
+        LOG.warn("Failed to update HistogramData from another with a different buckets");
+        return;
+      }
+
+      incTopBucketCount(other.numTopRecords);
+      incBottomBucketCount(other.numBottomRecords);
+      for (int i = 0; i < other.buckets.length; i++) {
+        incBucketCount(i, other.buckets[i]);
+      }
+    }
+  }
+
+  // TODO(BEAM-12103): Update this function to allow incrementing the infinite buckets as well.
+  // and remove the incTopBucketCount and incBotBucketCount methods.
+  // Using 0 and length -1 as the bucketIndex.
+  public synchronized void incBucketCount(int bucketIndex, long count) {
+    this.buckets[bucketIndex] += count;
+    this.numBoundedBucketRecords += count;
+  }
+
+  public synchronized void incTopBucketCount(long count) {
+    this.numTopRecords += count;
+  }
+
+  public synchronized void incBottomBucketCount(long count) {
+    this.numBottomRecords += count;
+  }
+
   public synchronized void clear() {
     this.buckets = new long[bucketType.getNumBuckets()];
-    this.numOfRecords = 0;
+    this.numBoundedBucketRecords = 0;
     this.numTopRecords = 0;
     this.numBottomRecords = 0;
   }
@@ -88,19 +127,17 @@
     double rangeTo = bucketType.getRangeTo();
     double rangeFrom = bucketType.getRangeFrom();
     if (value >= rangeTo) {
-      LOG.warn("record is out of upper bound {}: {}", rangeTo, value);
       numTopRecords++;
     } else if (value < rangeFrom) {
-      LOG.warn("record is out of lower bound {}: {}", rangeFrom, value);
       numBottomRecords++;
     } else {
       buckets[bucketType.getBucketIndex(value)]++;
-      numOfRecords++;
+      numBoundedBucketRecords++;
     }
   }
 
   public synchronized long getTotalCount() {
-    return numOfRecords + numTopRecords + numBottomRecords;
+    return numBoundedBucketRecords + numTopRecords + numBottomRecords;
   }
 
   public synchronized String getPercentileString(String elemType, String unit) {
@@ -117,7 +154,8 @@
   }
 
   /**
-   * Get the bucket count for the given bucketIndex.
+   * TODO(BEAM-12103): Update this function to allow indexing the -INF and INF bucket (using 0 and
+   * length -1) Get the bucket count for the given bucketIndex.
    *
    * <p>This method does not guarantee the atomicity when sequentially accessing the multiple
    * buckets i.e. other threads may alter the value between consecutive invocations. For summing the
@@ -130,6 +168,14 @@
     return buckets[bucketIndex];
   }
 
+  public synchronized long getTopBucketCount() {
+    return numTopRecords;
+  }
+
+  public synchronized long getBottomBucketCount() {
+    return numBottomRecords;
+  }
+
   public double p99() {
     return getLinearInterpolation(0.99);
   }
@@ -232,5 +278,7 @@
     public double getRangeTo() {
       return getStart() + getNumBuckets() * getWidth();
     }
+
+    // Note: equals() and hashCode() are implemented by the AutoValue.
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index f51938b..b113dac 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -39,12 +39,14 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -66,6 +68,9 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -851,4 +856,121 @@
       p.run();
     }
   }
+
+  /** Tests for TextSource class. */
+  @RunWith(JUnit4.class)
+  public static class TextSourceTest {
+    @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testRemoveUtf8BOM() throws Exception {
+      Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
+      Path p2 =
+          createTestFile(
+              "test_txt_utf8_no_bom",
+              Charset.forName("UTF-8"),
+              "1,p2-Japanese:テスト",
+              "2,p2-Japanese:テスト");
+      Path p3 =
+          createTestFile(
+              "test_txt_utf8_bom",
+              Charset.forName("UTF-8"),
+              "\uFEFF1,p3-テストBOM",
+              "\uFEFF2,p3-テストBOM");
+      PCollection<String> contents =
+          pipeline
+              .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
+              .setCoder(StringUtf8Coder.of())
+              // PCollection<String>
+              .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+      // PCollection<KV<String, String>>: tableName, line
+
+      // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
+      PAssert.that(contents)
+          .containsInAnyOrder(
+              "1,p1",
+              "2,p1",
+              "1,p2-Japanese:テスト",
+              "2,p2-Japanese:テスト",
+              "1,p3-テストBOM",
+              "\uFEFF2,p3-テストBOM");
+
+      pipeline.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testPreserveNonBOMBytes() throws Exception {
+      // Contains \uFEFE, not UTF BOM.
+      Path p1 =
+          createTestFile(
+              "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+      PCollection<String> contents =
+          pipeline
+              .apply("Create", Create.of(p1.toString()))
+              .setCoder(StringUtf8Coder.of())
+              // PCollection<String>
+              .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+
+      PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+
+      pipeline.run();
+    }
+
+    private static class FileReadDoFn extends DoFn<FileIO.ReadableFile, String> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        FileIO.ReadableFile file = c.element();
+        ValueProvider<String> filenameProvider =
+            ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
+        // Create a TextSource, passing null as the delimiter to use the default
+        // delimiters ('\n', '\r', or '\r\n').
+        TextSource textSource = new TextSource(filenameProvider, null, null);
+        try {
+          BoundedSource.BoundedReader<String> reader =
+              textSource
+                  .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
+                  .createReader(c.getPipelineOptions());
+          for (boolean more = reader.start(); more; more = reader.advance()) {
+            c.output(reader.getCurrent());
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "Unable to readFile: " + file.getMetadata().resourceId().toString());
+        }
+      }
+    }
+
+    /** A transform that reads CSV file records. */
+    private static class TextFileReadTransform
+        extends PTransform<PCollection<String>, PCollection<String>> {
+      public TextFileReadTransform() {}
+
+      @Override
+      public PCollection<String> expand(PCollection<String> files) {
+        return files
+            // PCollection<String>
+            .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
+            // PCollection<Match.Metadata>
+            .apply(FileIO.readMatches())
+            // PCollection<FileIO.ReadableFile>
+            .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn()));
+        // PCollection<String>: line
+      }
+    }
+
+    private Path createTestFile(String filename, Charset charset, String... lines)
+        throws IOException {
+      Path path = Files.createTempFile(filename, ".csv");
+      try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
+        for (String line : lines) {
+          writer.write(line);
+          writer.write('\n');
+        }
+      }
+      return path;
+    }
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
deleted file mode 100644
index 36a3f68..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for TextSource class. */
-@RunWith(JUnit4.class)
-public class TextSourceTest {
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testRemoveUtf8BOM() throws Exception {
-    Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
-    Path p2 =
-        createTestFile(
-            "test_txt_utf8_no_bom",
-            Charset.forName("UTF-8"),
-            "1,p2-Japanese:テスト",
-            "2,p2-Japanese:テスト");
-    Path p3 =
-        createTestFile(
-            "test_txt_utf8_bom",
-            Charset.forName("UTF-8"),
-            "\uFEFF1,p3-テストBOM",
-            "\uFEFF2,p3-テストBOM");
-    PCollection<String> contents =
-        pipeline
-            .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
-            .setCoder(StringUtf8Coder.of())
-            // PCollection<String>
-            .apply("Read file", new TextFileReadTransform());
-    // PCollection<KV<String, String>>: tableName, line
-
-    // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
-    PAssert.that(contents)
-        .containsInAnyOrder(
-            "1,p1",
-            "2,p1",
-            "1,p2-Japanese:テスト",
-            "2,p2-Japanese:テスト",
-            "1,p3-テストBOM",
-            "\uFEFF2,p3-テストBOM");
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testPreserveNonBOMBytes() throws Exception {
-    // Contains \uFEFE, not UTF BOM.
-    Path p1 =
-        createTestFile(
-            "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-    PCollection<String> contents =
-        pipeline
-            .apply("Create", Create.of(p1.toString()))
-            .setCoder(StringUtf8Coder.of())
-            // PCollection<String>
-            .apply("Read file", new TextFileReadTransform());
-
-    PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-
-    pipeline.run();
-  }
-
-  private static class FileReadDoFn extends DoFn<ReadableFile, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      ReadableFile file = c.element();
-      ValueProvider<String> filenameProvider =
-          ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
-      // Create a TextSource, passing null as the delimiter to use the default
-      // delimiters ('\n', '\r', or '\r\n').
-      TextSource textSource = new TextSource(filenameProvider, null, null);
-      try {
-        BoundedSource.BoundedReader<String> reader =
-            textSource
-                .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
-                .createReader(c.getPipelineOptions());
-        for (boolean more = reader.start(); more; more = reader.advance()) {
-          c.output(reader.getCurrent());
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Unable to readFile: " + file.getMetadata().resourceId().toString());
-      }
-    }
-  }
-
-  /** A transform that reads CSV file records. */
-  private static class TextFileReadTransform
-      extends PTransform<PCollection<String>, PCollection<String>> {
-    public TextFileReadTransform() {}
-
-    @Override
-    public PCollection<String> expand(PCollection<String> files) {
-      return files
-          // PCollection<String>
-          .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
-          // PCollection<Match.Metadata>
-          .apply(FileIO.readMatches())
-          // PCollection<FileIO.ReadableFile>
-          .apply("Read lines", ParDo.of(new FileReadDoFn()));
-      // PCollection<String>: line
-    }
-  }
-
-  private Path createTestFile(String filename, Charset charset, String... lines)
-      throws IOException {
-    Path path = Files.createTempFile(filename, ".csv");
-    try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
-      for (String line : lines) {
-        writer.write(line);
-        writer.write('\n');
-      }
-    }
-    return path;
-  }
-}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
index c86ccd8..a940778 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/HistogramDataTest.java
@@ -21,8 +21,6 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThrows;
 
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -30,14 +28,12 @@
 /** Tests for {@link HistogramData}. */
 @RunWith(JUnit4.class)
 public class HistogramDataTest {
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(HistogramData.class);
 
   @Test
   public void testOutOfRangeWarning() {
     HistogramData histogramData = HistogramData.linear(0, 20, 5);
     histogramData.record(100);
     assertThat(histogramData.getTotalCount(), equalTo(1L));
-    expectedLogs.verifyWarn("out of upper bound");
   }
 
   @Test
@@ -149,4 +145,53 @@
     assertThat(histogramData.getTotalCount(), equalTo(0L));
     assertThat(histogramData.getCount(5), equalTo(0L));
   }
+
+  @Test
+  public void testUpdateUsingDoublesAndCumulative() {
+    HistogramData data = HistogramData.linear(0, 2, 2);
+    data.record(-1); // to -Inf bucket
+    data.record(0); // bucket 0
+    data.record(1);
+    data.record(3); // bucket 1
+    data.record(4); // to Inf bucket
+    assertThat(data.getCount(0), equalTo(2L));
+    assertThat(data.getCount(1), equalTo(1L));
+    assertThat(data.getTotalCount(), equalTo(5L));
+
+    // Now try updating it with another HistogramData
+    HistogramData data2 = HistogramData.linear(0, 2, 2);
+    data2.record(-1); // to -Inf bucket
+    data2.record(-1); // to -Inf bucket
+    data2.record(0); // bucket 0
+    data2.record(0);
+    data2.record(1);
+    data2.record(1);
+    data2.record(3); // bucket 1
+    data2.record(3);
+    data2.record(4); // to Inf bucket
+    data2.record(4);
+    assertThat(data2.getCount(0), equalTo(4L));
+    assertThat(data2.getCount(1), equalTo(2L));
+    assertThat(data2.getTotalCount(), equalTo(10L));
+
+    data.update(data2);
+    assertThat(data.getCount(0), equalTo(6L));
+    assertThat(data.getCount(1), equalTo(3L));
+    assertThat(data.getTotalCount(), equalTo(15L));
+  }
+
+  @Test
+  public void testIncrementBucketCountByIndex() {
+    HistogramData data = HistogramData.linear(0, 2, 2);
+    data.incBottomBucketCount(1);
+    data.incBucketCount(0, 2);
+    data.incBucketCount(1, 3);
+    data.incTopBucketCount(4);
+
+    assertThat(data.getBottomBucketCount(), equalTo(1L));
+    assertThat(data.getCount(0), equalTo(2L));
+    assertThat(data.getCount(1), equalTo(3L));
+    assertThat(data.getTopBucketCount(), equalTo(4L));
+    assertThat(data.getTotalCount(), equalTo(10L));
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 0c7396d..74f4dba 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -19,11 +19,10 @@
 
 import static org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas.PAYLOAD_FIELD;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
-import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -33,6 +32,11 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Kafka table provider.
@@ -45,23 +49,62 @@
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+ * TYPE kafka
+ * // Optional. One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ *   "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ *   "topics": ["topic2", "topic3"]
+ * }'
  * }</pre>
  */
 @AutoService(TableProvider.class)
 public class KafkaTableProvider extends InMemoryMetaTableProvider {
+  private static class ParsedLocation {
+    String brokerLocation = "";
+    String topic = "";
+  }
+
+  private static ParsedLocation parseLocation(String location) {
+    ParsedLocation parsed = new ParsedLocation();
+    List<String> split = Splitter.on('/').splitToList(location);
+    checkArgument(
+        split.size() >= 2,
+        "Location string `%s` invalid: must be <broker bootstrap location>/<topic>.",
+        location);
+    parsed.topic = Iterables.getLast(split);
+    parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 1));
+    return parsed;
+  }
+
+  private static List<String> mergeParam(Optional<String> initial, @Nullable List<Object> toMerge) {
+    ImmutableList.Builder<String> merged = ImmutableList.builder();
+    initial.ifPresent(merged::add);
+    if (toMerge != null) {
+      toMerge.forEach(o -> merged.add(o.toString()));
+    }
+    return merged.build();
+  }
+
   @Override
   public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = table.getSchema();
-
     JSONObject properties = table.getProperties();
-    String bootstrapServers = properties.getString("bootstrap.servers");
-    JSONArray topicsArr = properties.getJSONArray("topics");
-    List<String> topics = new ArrayList<>(topicsArr.size());
-    for (Object topic : topicsArr) {
-      topics.add(topic.toString());
+
+    Optional<ParsedLocation> parsedLocation = Optional.empty();
+    if (!Strings.isNullOrEmpty(table.getLocation())) {
+      parsedLocation = Optional.of(parseLocation(checkArgumentNotNull(table.getLocation())));
     }
+    List<String> topics =
+        mergeParam(parsedLocation.map(loc -> loc.topic), properties.getJSONArray("topics"));
+    List<String> allBootstrapServers =
+        mergeParam(
+            parsedLocation.map(loc -> loc.brokerLocation),
+            properties.getJSONArray("bootstrap_servers"));
+    String bootstrapServers = String.join(",", allBootstrapServers);
 
     Optional<String> payloadFormat =
         properties.containsKey("format")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
index 869c86b..fee04e4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
@@ -93,8 +93,8 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(TEST_SCHEMA)
-                    .properties(
-                        JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"avro\" }"))
+                    .location("localhost/mytopic")
+                    .properties(JSON.parseObject("{ \"format\": \"avro\" }"))
                     .build()));
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
index 6f97c4e..d33665c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
@@ -66,8 +66,8 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(TEST_SCHEMA)
-                    .properties(
-                        JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"json\" }"))
+                    .location("localhost/mytopic")
+                    .properties(JSON.parseObject("{ \"format\": \"json\" }"))
                     .build()));
   }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
index ba90cf9..a75dded 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
@@ -92,9 +92,10 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(schema)
+                    .location("localhost/mytopic")
                     .properties(
                         JSON.parseObject(
-                            "{ \"topics\": [ \"mytopic\" ], \"format\": \"proto\", \"protoClass\": \""
+                            "{ \"format\": \"proto\", \"protoClass\": \""
                                 + PayloadMessages.TestMessage.class.getName()
                                 + "\" }"))
                     .build()));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
index 860bc34..958ca63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
@@ -98,9 +98,10 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(schema)
+                    .location("localhost/mytopic")
                     .properties(
                         JSON.parseObject(
-                            "{ \"topics\": [ \"mytopic\" ], \"format\": \"thrift\", \"thriftClass\": \""
+                            "{ \"format\": \"thrift\", \"thriftClass\": \""
                                 + TestThriftMessage.class.getName()
                                 + "\", \"thriftProtocolFactoryClass\": \""
                                 + TCompactProtocol.Factory.class.getName()
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
index 0546c7a..ec94b65 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
@@ -19,6 +19,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.alibaba.fastjson.JSON;
 import java.io.Serializable;
@@ -130,6 +131,13 @@
     kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
     kafkaOptions.setKafkaTopic(topic);
     kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
+    checkArgument(
+        !KAFKA_CONTAINER.getBootstrapServers().contains(","),
+        "This integration test expects exactly one bootstrap server.");
+  }
+
+  private static String buildLocation() {
+    return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
   }
 
   @Test
@@ -139,7 +147,7 @@
         Table.builder()
             .name("kafka_table")
             .comment("kafka table")
-            .location("")
+            .location(buildLocation())
             .schema(TEST_TABLE_SCHEMA)
             .type("kafka")
             .properties(JSON.parseObject(objectsProvider.getKafkaPropertiesString()))
@@ -165,9 +173,9 @@
                 + "f_string VARCHAR NOT NULL \n"
                 + ") \n"
                 + "TYPE 'kafka' \n"
-                + "LOCATION ''\n"
+                + "LOCATION '%s'\n"
                 + "TBLPROPERTIES '%s'",
-            objectsProvider.getKafkaPropertiesString());
+            buildLocation(), objectsProvider.getKafkaPropertiesString());
     TableProvider tb = new KafkaTableProvider();
     BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
 
@@ -213,9 +221,9 @@
                 + ">"
                 + ") \n"
                 + "TYPE 'kafka' \n"
-                + "LOCATION ''\n"
+                + "LOCATION '%s'\n"
                 + "TBLPROPERTIES '%s'",
-            objectsProvider.getKafkaPropertiesString());
+            buildLocation(), objectsProvider.getKafkaPropertiesString());
     TableProvider tb = new KafkaTableProvider();
     BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
 
@@ -365,11 +373,7 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + (getPayloadFormat() == null ? "" : "\"format\" : \"" + getPayloadFormat() + "\",")
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"] }";
+          + "}";
     }
   }
 
@@ -410,11 +414,6 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + "\"format\" : \"proto\","
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"],"
           + "\"protoClass\": \""
           + PayloadMessages.ItMessage.class.getName()
           + "\"}";
@@ -473,11 +472,6 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + "\"format\" : \"thrift\","
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"],"
           + "\"thriftClass\": \""
           + thriftClass.getName()
           + "\", \"thriftProtocolFactoryClass\": \""
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index e7154d9..a2a663a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -23,12 +23,13 @@
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import java.util.List;
 import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.thrift.TBase;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -38,6 +39,8 @@
 /** UnitTest for {@link KafkaTableProvider}. */
 public class KafkaTableProviderTest {
   private final KafkaTableProvider provider = new KafkaTableProvider();
+  private static final String LOCATION_BROKER = "104.126.7.88:7743";
+  private static final String LOCATION_TOPIC = "topic1";
 
   @Test
   public void testBuildBeamSqlCSVTable() {
@@ -47,9 +50,37 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaCSVTable);
 
-    BeamKafkaCSVTable csvTable = (BeamKafkaCSVTable) sqlTable;
-    assertEquals("localhost:9092", csvTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+  }
+
+  @Test
+  public void testBuildWithExtraServers() {
+    Table table =
+        mockTableWithExtraServers("hello", ImmutableList.of("localhost:1111", "localhost:2222"));
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(
+        LOCATION_BROKER + ",localhost:1111,localhost:2222", kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+  }
+
+  @Test
+  public void testBuildWithExtraTopics() {
+    Table table = mockTableWithExtraTopics("hello", ImmutableList.of("topic2", "topic3"));
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), kafkaTable.getTopics());
   }
 
   @Test
@@ -60,9 +91,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable csvTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", csvTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -73,9 +104,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable protoTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", protoTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), protoTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -87,9 +118,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -100,9 +131,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -114,9 +145,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -125,48 +156,67 @@
   }
 
   private static Table mockTable(String name) {
-    return mockTable(name, false, null, null, null, null);
+    return mockTable(name, false, null, null, null, null, null, null);
+  }
+
+  private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
+    return mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
+  }
+
+  private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
+    return mockTable(name, false, null, extraTopics, null, null, null, null);
   }
 
   private static Table mockTable(String name, String payloadFormat) {
-    return mockTable(name, false, payloadFormat, null, null, null);
+    return mockTable(name, false, null, null, payloadFormat, null, null, null);
   }
 
   private static Table mockProtoTable(String name, Class<?> protoClass) {
-    return mockTable(name, false, "proto", protoClass, null, null);
+    return mockTable(name, false, null, null, "proto", protoClass, null, null);
   }
 
   private static Table mockThriftTable(
       String name,
       Class<? extends TBase<?, ?>> thriftClass,
       Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
-    return mockTable(name, false, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+    return mockTable(
+        name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
   }
 
   private static Table mockNestedBytesTable(String name) {
-    return mockTable(name, true, null, null, null, null);
+    return mockTable(name, true, null, null, null, null, null, null);
   }
 
   private static Table mockNestedThriftTable(
       String name,
       Class<? extends TBase<?, ?>> thriftClass,
       Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
-    return mockTable(name, true, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+    return mockTable(
+        name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
   }
 
   private static Table mockTable(
       String name,
       boolean isNested,
+      @Nullable List<String> extraBootstrapServers,
+      @Nullable List<String> extraTopics,
       @Nullable String payloadFormat,
       @Nullable Class<?> protoClass,
       @Nullable Class<? extends TBase<?, ?>> thriftClass,
       @Nullable Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
     JSONObject properties = new JSONObject();
-    properties.put("bootstrap.servers", "localhost:9092");
-    JSONArray topics = new JSONArray();
-    topics.add("topic1");
-    topics.add("topic2");
-    properties.put("topics", topics);
+
+    if (extraBootstrapServers != null) {
+      JSONArray bootstrapServers = new JSONArray();
+      bootstrapServers.addAll(extraBootstrapServers);
+      properties.put("bootstrap_servers", bootstrapServers);
+    }
+    if (extraTopics != null) {
+      JSONArray topics = new JSONArray();
+      topics.addAll(extraTopics);
+      properties.put("topics", topics);
+    }
+
     if (payloadFormat != null) {
       properties.put("format", payloadFormat);
     }
@@ -197,7 +247,7 @@
     return Table.builder()
         .name(name)
         .comment(name + " table")
-        .location("kafka://localhost:2181/brokers?topic=test")
+        .location(LOCATION_BROKER + "/" + LOCATION_TOPIC)
         .schema(schema)
         .type("kafka")
         .properties(properties)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index e1bf02f..e5bba75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -25,17 +25,15 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.metrics.MetricsLogger;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -56,7 +54,6 @@
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.joda.time.Duration;
@@ -80,7 +77,7 @@
   private final boolean ignoreInsertIds;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
   private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
-  private final Set<MetricName> metricFilter;
+  private final Set<String> allowedMetricUrns;
 
   /** Tracks bytes written, exposed as "ByteCount" Counter. */
   private Counter byteCounter = SinkMetrics.bytesWritten();
@@ -109,7 +106,7 @@
     this.ignoreInsertIds = ignoreInsertIds;
     this.toTableRow = toTableRow;
     this.toFailsafeTableRow = toFailsafeTableRow;
-    this.metricFilter = getMetricFilter();
+    this.allowedMetricUrns = getAllowedMetricUrns();
     this.batchViaStateful = false;
   }
 
@@ -135,34 +132,14 @@
     this.ignoreInsertIds = ignoreInsertIds;
     this.toTableRow = toTableRow;
     this.toFailsafeTableRow = toFailsafeTableRow;
-    this.metricFilter = getMetricFilter();
+    this.allowedMetricUrns = getAllowedMetricUrns();
     this.batchViaStateful = batchViaStateful;
   }
 
-  private static Set<MetricName> getMetricFilter() {
-    ImmutableSet.Builder<MetricName> setBuilder = ImmutableSet.builder();
-    setBuilder.add(
-        MonitoringInfoMetricName.named(
-            MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES,
-            BigQueryServicesImpl.API_METRIC_LABEL));
-    for (String status : BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_MAP.values()) {
-      setBuilder.add(
-          MonitoringInfoMetricName.named(
-              MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
-              ImmutableMap.<String, String>builder()
-                  .putAll(BigQueryServicesImpl.API_METRIC_LABEL)
-                  .put(MonitoringInfoConstants.Labels.STATUS, status)
-                  .build()));
-    }
-    setBuilder.add(
-        MonitoringInfoMetricName.named(
-            MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
-            ImmutableMap.<String, String>builder()
-                .putAll(BigQueryServicesImpl.API_METRIC_LABEL)
-                .put(
-                    MonitoringInfoConstants.Labels.STATUS,
-                    BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_UNKNOWN)
-                .build()));
+  private static Set<String> getAllowedMetricUrns() {
+    ImmutableSet.Builder<String> setBuilder = ImmutableSet.builder();
+    setBuilder.add(MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
+    setBuilder.add(MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES);
     return setBuilder.build();
   }
 
@@ -394,10 +371,9 @@
     if (processWideContainer instanceof MetricsLogger) {
       MetricsLogger processWideMetricsLogger = (MetricsLogger) processWideContainer;
       processWideMetricsLogger.tryLoggingMetrics(
-          "BigQuery HTTP API Metrics: \n",
-          metricFilter,
-          options.getBqStreamingApiLoggingFrequencySec() * 1000L,
-          true);
+          "API call Metrics: \n",
+          this.allowedMetricUrns,
+          options.getBqStreamingApiLoggingFrequencySec() * 1000L);
     }
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 0bb5d2c..efc0fa9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -181,7 +181,8 @@
      * Create an append client for a given Storage API write stream. The stream must be created
      * first.
      */
-    StreamAppendClient getStreamAppendClient(String streamName) throws Exception;
+    StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor)
+        throws Exception;
 
     /** Flush a given stream up to the given offset. The stream must have type BUFFERED. */
     ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
@@ -200,8 +201,7 @@
   /** An interface for appending records to a Storage API write stream. */
   interface StreamAppendClient extends AutoCloseable {
     /** Append rows to a Storage API write stream at the given offset. */
-    ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows, Descriptor descriptor)
-        throws Exception;
+    ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception;
 
     /**
      * Pin this object. If close() is called before all pins are removed, the underlying resources
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 8f7a097..290dca2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -63,7 +63,6 @@
 import com.google.cloud.bigquery.storage.v1.ReadSession;
 import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
 import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
 import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest;
 import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
@@ -633,7 +632,7 @@
                       table.getTableReference().getProjectId(),
                       table.getTableReference().getDatasetId(),
                       table.getTableReference().getTableId(),
-                      TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+                      TimeUnit.MILLISECONDS.toMinutes(RETRY_CREATE_TABLE_DURATION_MILLIS));
                   retry = true;
                 }
                 continue;
@@ -1095,8 +1094,12 @@
     }
 
     @Override
-    public StreamAppendClient getStreamAppendClient(String streamName) throws Exception {
-      StreamWriterV2 streamWriter = StreamWriterV2.newBuilder(streamName).build();
+    public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor)
+        throws Exception {
+      ProtoSchema protoSchema =
+          ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
+      StreamWriterV2 streamWriter =
+          StreamWriterV2.newBuilder(streamName).setWriterSchema(protoSchema).build();
       return new StreamAppendClient() {
         private int pins = 0;
         private boolean closed = false;
@@ -1136,20 +1139,9 @@
         }
 
         @Override
-        public ApiFuture<AppendRowsResponse> appendRows(
-            long offset, ProtoRows rows, Descriptor descriptor) throws Exception {
-          final AppendRowsRequest.ProtoData data =
-              AppendRowsRequest.ProtoData.newBuilder()
-                  .setWriterSchema(
-                      ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build())
-                  .setRows(rows)
-                  .build();
-          AppendRowsRequest.Builder appendRequestBuilder =
-              AppendRowsRequest.newBuilder().setProtoRows(data).setWriteStream(streamName);
-          if (offset >= 0) {
-            appendRequestBuilder = appendRequestBuilder.setOffset(Int64Value.of(offset));
-          }
-          return streamWriter.append(appendRequestBuilder.build());
+        public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
+            throws Exception {
+          return streamWriter.append(rows, offset);
         }
       };
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 6f21e8f..8f15121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -167,7 +167,8 @@
                     .createWriteStream(tableUrn, Type.PENDING)
                     .getName();
             this.streamAppendClient =
-                Preconditions.checkNotNull(datasetService).getStreamAppendClient(streamName);
+                Preconditions.checkNotNull(datasetService)
+                    .getStreamAppendClient(streamName, messageConverter.getSchemaDescriptor());
             this.currentOffset = 0;
           }
           return streamAppendClient;
@@ -218,8 +219,7 @@
               try {
                 long offset = currentOffset;
                 currentOffset += inserts.getSerializedRowsCount();
-                return getWriteStream()
-                    .appendRows(offset, protoRows, messageConverter.getSchemaDescriptor());
+                return getWriteStream().appendRows(offset, protoRows);
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index ca465dc..de04b12 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -333,6 +333,7 @@
       MessageConverter<ElementT> messageConverter =
           messageConverters.get(element.getKey().getKey(), dynamicDestinations);
       Descriptor descriptor = messageConverter.getSchemaDescriptor();
+
       // Each ProtoRows object contains at most 1MB of rows.
       // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
       // already proto or
@@ -381,7 +382,8 @@
               }
               String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
               StreamAppendClient appendClient =
-                  APPEND_CLIENTS.get(stream, () -> datasetService.getStreamAppendClient(stream));
+                  APPEND_CLIENTS.get(
+                      stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
               for (AppendRowsContext context : contexts) {
                 context.streamName = stream;
                 appendClient.pin();
@@ -420,8 +422,8 @@
                 StreamAppendClient appendClient =
                     APPEND_CLIENTS.get(
                         context.streamName,
-                        () -> datasetService.getStreamAppendClient(context.streamName));
-                return appendClient.appendRows(context.offset, protoRows, descriptor);
+                        () -> datasetService.getStreamAppendClient(context.streamName, descriptor));
+                return appendClient.appendRows(context.offset, protoRows);
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index b24bf16..a716586 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -415,11 +415,11 @@
   }
 
   @Override
-  public StreamAppendClient getStreamAppendClient(String streamName) {
+  public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor) {
     return new StreamAppendClient() {
       @Override
-      public ApiFuture<AppendRowsResponse> appendRows(
-          long offset, ProtoRows rows, Descriptor descriptor) throws Exception {
+      public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
+          throws Exception {
         synchronized (tables) {
           Stream stream = writeStreams.get(streamName);
           if (stream == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 4877d86..286df98 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -1063,7 +1063,7 @@
     verifyNotNull(ret.getTableReference());
     expectedLogs.verifyInfo(
         "Quota limit reached when creating table project:dataset.table, "
-            + "retrying up to 5.0 minutes");
+            + "retrying up to 5 minutes");
   }
 
   /** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link ErrorContainer}. */
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 78031a4..3a8010a 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -37,71 +37,109 @@
 })
 
 
+def _get_deferred_args(*args):
+  return [
+      frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(arg, arg[0:0])) for arg in args
+  ]
+
+
 class DeferredFrameTest(unittest.TestCase):
-  def _run_test(self, func, *args, distributed=True, expect_error=False):
-    deferred_args = [
-        frame_base.DeferredFrame.wrap(
-            expressions.ConstantExpression(arg, arg[0:0])) for arg in args
-    ]
+  def _run_error_test(self, func, *args):
+    """Verify that func(*args) raises the same exception in pandas and in Beam.
+
+    Note that for Beam this only checks for exceptions that are raised during
+    expression generation (i.e. construction time). Execution time exceptions
+    are not helpful."""
+    deferred_args = _get_deferred_args(*args)
+
+    # Get expected error
     try:
       expected = func(*args)
     except Exception as e:
-      if not expect_error:
-        raise
-      expected = e
+      expected_error = e
     else:
-      if expect_error:
-        raise AssertionError(
-            "Expected an error but computing expected result successfully "
-            f"returned: {expected}")
+      raise AssertionError(
+          "Expected an error, but executing with pandas successfully "
+          f"returned:\n{expected}")
 
-    session_type = (
-        expressions.PartitioningSession if distributed else expressions.Session)
+    # Get actual error
     try:
-      actual = session_type({}).evaluate(func(*deferred_args)._expr)
+      _ = func(*deferred_args)._expr
     except Exception as e:
-      if not expect_error:
-        raise
       actual = e
     else:
-      if expect_error:
-        raise AssertionError(
-            "Expected an error:\n{expected}\nbut successfully "
-            f"returned:\n{actual}")
+      raise AssertionError(
+          "Expected an error:\n{expected_error}\nbut Beam successfully "
+          "generated an expression.")
 
-    if expect_error:
-      if not isinstance(actual,
-                        type(expected)) or not str(actual) == str(expected):
-        raise AssertionError(
-            f'Expected {expected!r} to be raised, but got {actual!r}'
-        ) from actual
+    # Verify
+    if (not isinstance(actual, type(expected_error)) or
+        not str(actual) == str(expected_error)):
+      raise AssertionError(
+          f'Expected {expected_error!r} to be raised, but got {actual!r}'
+      ) from actual
+
+  def _run_test(self, func, *args, distributed=True, nonparallel=False):
+    """Verify that func(*args) produces the same result in pandas and in Beam.
+
+    Args:
+        distributed (bool): Whether or not to use PartitioningSession to
+            simulate parallel execution.
+        nonparallel (bool): Whether or not this function contains a
+            non-parallelizable operation. If True, the expression will be
+            generated twice, once outside of an allow_non_parallel_operations
+            block (to verify NonParallelOperation is raised), and again inside
+            of an allow_non_parallel_operations block to actually generate an
+            expression to verify."""
+    # Compute expected value
+    expected = func(*args)
+
+    # Compute actual value
+    deferred_args = _get_deferred_args(*args)
+    if nonparallel:
+      # First run outside a nonparallel block to confirm this raises as expected
+      with self.assertRaises(expressions.NonParallelOperation):
+        _ = func(*deferred_args)
+
+      # Re-run in an allow non parallel block to get an expression to verify
+      with beam.dataframe.allow_non_parallel_operations():
+        expr = func(*deferred_args)._expr
     else:
-      if isinstance(expected, pd.core.generic.NDFrame):
-        if distributed:
-          if expected.index.is_unique:
-            expected = expected.sort_index()
-            actual = actual.sort_index()
-          else:
-            expected = expected.sort_values(list(expected.columns))
-            actual = actual.sort_values(list(actual.columns))
+      expr = func(*deferred_args)._expr
 
-        if isinstance(expected, pd.Series):
-          pd.testing.assert_series_equal(expected, actual)
-        elif isinstance(expected, pd.DataFrame):
-          pd.testing.assert_frame_equal(expected, actual)
+    # Compute the result of the generated expression
+    session_type = (
+        expressions.PartitioningSession if distributed else expressions.Session)
+
+    actual = session_type({}).evaluate(expr)
+
+    # Verify
+    if isinstance(expected, pd.core.generic.NDFrame):
+      if distributed:
+        if expected.index.is_unique:
+          expected = expected.sort_index()
+          actual = actual.sort_index()
         else:
-          raise ValueError(
-              f"Expected value is a {type(expected)},"
-              "not a Series or DataFrame.")
+          expected = expected.sort_values(list(expected.columns))
+          actual = actual.sort_values(list(actual.columns))
+
+      if isinstance(expected, pd.Series):
+        pd.testing.assert_series_equal(expected, actual)
+      elif isinstance(expected, pd.DataFrame):
+        pd.testing.assert_frame_equal(expected, actual)
       else:
-        # Expectation is not a pandas object
-        if isinstance(expected, float):
-          cmp = lambda x: np.isclose(expected, x)
-        else:
-          cmp = expected.__eq__
-        self.assertTrue(
-            cmp(actual),
-            'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
+        raise ValueError(
+            f"Expected value is a {type(expected)},"
+            "not a Series or DataFrame.")
+    else:
+      # Expectation is not a pandas object
+      if isinstance(expected, float):
+        cmp = lambda x: np.isclose(expected, x)
+      else:
+        cmp = expected.__eq__
+      self.assertTrue(
+          cmp(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
 
   def test_series_arithmetic(self):
     a = pd.Series([1, 2, 3])
@@ -246,14 +284,10 @@
     self._run_test(lambda df: df.groupby('group').bar.sum(), df)
     self._run_test(lambda df: df.groupby('group')['foo'].sum(), df)
     self._run_test(lambda df: df.groupby('group')['baz'].sum(), df)
-    self._run_test(
-        lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(),
-        df,
-        expect_error=True)
-    self._run_test(
-        lambda df: df.groupby('group')[['bat']].sum(), df, expect_error=True)
-    self._run_test(
-        lambda df: df.groupby('group').bat.sum(), df, expect_error=True)
+    self._run_error_test(
+        lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(), df)
+    self._run_error_test(lambda df: df.groupby('group')[['bat']].sum(), df)
+    self._run_error_test(lambda df: df.groupby('group').bat.sum(), df)
 
     self._run_test(lambda df: df.groupby('group').median(), df)
     self._run_test(lambda df: df.groupby('group').foo.median(), df)
@@ -266,26 +300,19 @@
     df = GROUPBY_DF
 
     # non-existent projection column
-    self._run_test(
-        lambda df: df.groupby('group')[['bar', 'baz']].bar.median(),
-        df,
-        expect_error=True)
-    self._run_test(
-        lambda df: df.groupby('group')[['bad']].median(), df, expect_error=True)
+    self._run_error_test(
+        lambda df: df.groupby('group')[['bar', 'baz']].bar.median(), df)
+    self._run_error_test(lambda df: df.groupby('group')[['bad']].median(), df)
 
-    self._run_test(
-        lambda df: df.groupby('group').bad.median(), df, expect_error=True)
+    self._run_error_test(lambda df: df.groupby('group').bad.median(), df)
 
   def test_groupby_errors_non_existent_label(self):
     df = GROUPBY_DF
 
     # non-existent grouping label
-    self._run_test(
-        lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(),
-        df,
-        expect_error=True)
-    self._run_test(
-        lambda df: df.groupby('bad').foo.sum(), df, expect_error=True)
+    self._run_error_test(
+        lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(), df)
+    self._run_error_test(lambda df: df.groupby('bad').foo.sum(), df)
 
   def test_groupby_callable(self):
     df = GROUPBY_DF
@@ -307,11 +334,9 @@
     self._run_test(lambda df: df.set_index(['index1', 'index2'], drop=True), df)
     self._run_test(lambda df: df.set_index('values'), df)
 
-    self._run_test(lambda df: df.set_index('bad'), df, expect_error=True)
-    self._run_test(
-        lambda df: df.set_index(['index2', 'bad', 'really_bad']),
-        df,
-        expect_error=True)
+    self._run_error_test(lambda df: df.set_index('bad'), df)
+    self._run_error_test(
+        lambda df: df.set_index(['index2', 'bad', 'really_bad']), df)
 
   def test_series_drop_ignore_errors(self):
     midx = pd.MultiIndex(
@@ -397,22 +422,21 @@
     df2 = pd.DataFrame({
         'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
     })
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
-              index=lambda x: '*'),
-          df1,
-          df2)
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(
-              df2,
-              left_on='lkey',
-              right_on='rkey',
-              suffixes=('_left', '_right')).rename(index=lambda x: '*'),
-          df1,
-          df2)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
+            index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(
+            df2, left_on='lkey', right_on='rkey', suffixes=('_left', '_right')).
+        rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
 
   def test_merge_left_join(self):
     # This is from the pandas doctests, but fails due to re-indexing being
@@ -420,12 +444,12 @@
     df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
     df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
-          df1,
-          df2)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
 
   def test_merge_on_index(self):
     # This is from the pandas doctests, but fails due to re-indexing being
@@ -436,12 +460,12 @@
     df2 = pd.DataFrame({
         'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
     }).set_index('rkey')
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, left_index=True, right_index=True),
-          df1,
-          df2)
+
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, left_index=True, right_index=True),
+        df1,
+        df2)
 
   def test_merge_same_key(self):
     df1 = pd.DataFrame({
@@ -450,55 +474,58 @@
     df2 = pd.DataFrame({
         'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
     })
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
-          df1,
-          df2)
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
-              index=lambda x: '*'),
-          df1,
-          df2)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
+            index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
 
   def test_merge_same_key_doctest(self):
     df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
     df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
-          df1,
-          df2)
-      # Test without specifying 'on'
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
-          df1,
-          df2)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
+    # Test without specifying 'on'
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
 
   def test_merge_same_key_suffix_collision(self):
     df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
     df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(
-              df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).
-          rename(index=lambda x: '*'),
-          df1,
-          df2)
-      # Test without specifying 'on'
-      self._run_test(
-          lambda df1,
-          df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
-          rename(index=lambda x: '*'),
-          df1,
-          df2)
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(
+            df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).rename(
+                index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
+    # Test without specifying 'on'
+    self._run_test(
+        lambda df1,
+        df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
+        rename(index=lambda x: '*'),
+        df1,
+        df2,
+        nonparallel=True)
 
   def test_series_getitem(self):
     s = pd.Series([x**2 for x in range(10)])
@@ -550,10 +577,9 @@
     s = pd.Series(list(range(16)))
     self._run_test(lambda s: s.agg('sum'), s)
     self._run_test(lambda s: s.agg(['sum']), s)
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda s: s.agg(['sum', 'mean']), s)
-      self._run_test(lambda s: s.agg(['mean']), s)
-      self._run_test(lambda s: s.agg('mean'), s)
+    self._run_test(lambda s: s.agg(['sum', 'mean']), s, nonparallel=True)
+    self._run_test(lambda s: s.agg(['mean']), s, nonparallel=True)
+    self._run_test(lambda s: s.agg('mean'), s, nonparallel=True)
 
   def test_append_sort(self):
     # yapf: disable
@@ -573,12 +599,20 @@
   def test_dataframe_agg(self):
     df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
     self._run_test(lambda df: df.agg('sum'), df)
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.agg(['sum', 'mean']), df)
-      self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
-      self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
-      self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
-      self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
+    self._run_test(lambda df: df.agg(['sum', 'mean']), df, nonparallel=True)
+    self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
+    self._run_test(
+        lambda df: df.agg({
+            'A': 'sum', 'B': 'mean'
+        }), df, nonparallel=True)
+    self._run_test(
+        lambda df: df.agg({'A': ['sum', 'mean']}), df, nonparallel=True)
+    self._run_test(
+        lambda df: df.agg({
+            'A': ['sum', 'mean'], 'B': 'min'
+        }),
+        df,
+        nonparallel=True)
 
   def test_smallest_largest(self):
     df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})
@@ -622,9 +656,8 @@
     df = df.set_index('B')
     # TODO(BEAM-11190): These aggregations can be done in index partitions, but
     # it will require a little more complex logic
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.groupby(level=0).sum(), df)
-      self._run_test(lambda df: df.groupby(level=0).mean(), df)
+    self._run_test(lambda df: df.groupby(level=0).sum(), df, nonparallel=True)
+    self._run_test(lambda df: df.groupby(level=0).mean(), df, nonparallel=True)
 
   def test_dataframe_eval_query(self):
     df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
@@ -686,6 +719,14 @@
                                 r"df\.quantile\(q=0\.1, axis='columns'\)"):
       self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
 
+  @unittest.skipIf(PD_VERSION < (1, 1), "drop_na added in pandas 1.1.0")
+  def test_groupby_count_na(self):
+    # Verify we can do a groupby.count() that doesn't drop NaN values
+    self._run_test(
+        lambda df: df.groupby('foo', dropna=True).bar.count(), GROUPBY_DF)
+    self._run_test(
+        lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index c456f2a..7e8c155 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -381,8 +381,17 @@
         self._saved_pcoders[os.path.join(*labels)])
 
   def cleanup(self):
+
     if os.path.exists(self._cache_dir):
-      shutil.rmtree(self._cache_dir)
+
+      def on_fail_to_cleanup(function, path, excinfo):
+        _LOGGER.warning(
+            'Failed to clean up temporary files: %s. You may'
+            'manually delete them if necessary. Error was: %s',
+            path,
+            excinfo)
+
+      shutil.rmtree(self._cache_dir, onerror=on_fail_to_cleanup)
     self._saved_pcoders = {}
     self._capture_sinks = {}
     self._capture_keys = set()
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index 343ad6a..6354184 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -501,18 +501,23 @@
     class SizeLimiter(Limiter):
       def __init__(self, p):
         self.pipeline = p
+        self._rm = None
+
+      def set_recording_manager(self, rm):
+        self._rm = rm
 
       def is_triggered(self):
-        rm = ie.current_env().get_recording_manager(self.pipeline)
-        return rm.describe()['size'] > 0 if rm else False
+        return self._rm.describe()['size'] > 0 if self._rm else False
 
     # Do the first recording to get the timestamp of the first time the fragment
     # was run.
-    rm = RecordingManager(p, test_limiters=[SizeLimiter(p)])
+    size_limiter = SizeLimiter(p)
+    rm = RecordingManager(p, test_limiters=[size_limiter])
+    size_limiter.set_recording_manager(rm)
     self.assertEqual(rm.describe()['state'], PipelineState.STOPPED)
     self.assertTrue(rm.record_pipeline())
 
-    ie.current_env().set_recording_manager(rm, p)
+    # A recording is in progress, no need to start another one.
     self.assertFalse(rm.record_pipeline())
 
     for _ in range(60):
diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 59fedfd..f109726 100644
--- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -344,21 +344,27 @@
 
 ### Syntax
 
+#### Nested mode
 ```
 CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
     event_timestamp TIMESTAMP,
-    attributes MAP<VARCHAR, VARCHAR>,
-    payload ROW<tableElement [, tableElement ]*>
+    attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
+    payload [BYTES, ROW<tableElement [, tableElement ]*>]
 )
 TYPE pubsub
 LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
-TBLPROPERTIES '{
-    "timestampAttributeKey": "key",
-    "deadLetterQueue": "projects/[PROJECT]/topics/[TOPIC]",
-    "format": "format"
-}'
 ```
 
+#### Flattened mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
+TYPE pubsub
+LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
+```
+
+In nested mode, the following fields hold topic metadata. The presence of the
+`attributes` field triggers nested mode usage.
+
 *   `event_timestamp`: The event timestamp associated with the Pub/Sub message
     by PubsubIO. It can be one of the following:
     *   Message publish time, which is provided by Pub/Sub. This is the default
@@ -376,6 +382,8 @@
     `deadLeaderQueue` field of the `tblProperties` blob. If no dead-letter queue
     is specified in this case, an exception is thrown and the pipeline will
     crash.
+
+
 *   `LOCATION`:
     *   `PROJECT`: ID of the Google Cloud Project
     *   `TOPIC`: The Pub/Sub topic name. A subscription will be created
@@ -390,15 +398,14 @@
         payload was not parsed. If not specified, an exception is thrown for
         parsing failures.
     *   `format`: Optional. Allows you to specify the Pubsub payload format.
-        Possible values are {`json`, `avro`}. Defaults to `json`.
 
 ### Read Mode
 
-PubsubIO is currently limited to read access only.
+PubsubIO supports reading from topics by creating a new subscription.
 
 ### Write Mode
 
-Not supported. PubSubIO is currently limited to read access only in Beam SQL.
+PubsubIO supports writing to topics.
 
 ### Schema
 
@@ -411,13 +418,7 @@
 
 ### Supported Payload
 
-*   JSON Objects (Default)
-    *   Beam only supports querying messages with payload containing JSON
-        objects. Beam attempts to parse JSON to match the schema of the
-        `payload` field.
-*   Avro
-    *   An Avro schema is automatically generated from the specified schema of
-        the `payload` field. It is used to parse incoming messages.
+*   Pub/Sub supports [Generic Payload Handling](#generic-payload-handling).
 
 ### Example
 
@@ -427,38 +428,106 @@
 LOCATION 'projects/testing-integration/topics/user-location'
 ```
 
+## Pub/Sub Lite
+
+### Syntax
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
+    publish_timestamp DATETIME,
+    event_timestamp DATETIME,
+    message_key BYTES,
+    attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+    payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE pubsublite
+// For writing
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
+// For reading
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
+```
+
+*   `LOCATION`:
+    *   `PROJECT`: ID of the Google Cloud Project
+    *   `TOPIC`: The Pub/Sub Lite topic name.
+    *   `SUBSCRIPTION`: The Pub/Sub Lite subscription name.
+    *   `GCP-LOCATION`: The location for this Pub/Sub Lite topic os subscription.
+*   `TBLPROPERTIES`:
+    *   `timestampAttributeKey`: Optional. The key which contains the event
+        timestamp associated with the Pub/Sub message. If not specified, the
+        message publish timestamp is used as an event timestamp for
+        windowing/watermarking.
+    *   `deadLetterQueue`: Optional, supports
+        [Generic DLQ Handling](#generic-dlq-handling)
+    *   `format`: Optional. Allows you to specify the payload format.
+
+### Read Mode
+
+PubsubLiteIO supports reading from subscriptions.
+
+### Write Mode
+
+PubsubLiteIO supports writing to topics.
+
+### Supported Payload
+
+*   Pub/Sub Lite supports [Generic Payload Handling](#generic-payload-handling).
+
+### Example
+
+```
+CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
+TYPE pubsublite
+LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
+```
+
 ## Kafka
 
 KafkaIO is experimental in Beam SQL.
 
 ### Syntax
 
+#### Flattened mode
 ```
 CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
 TYPE kafka
-LOCATION 'kafka://localhost:2181/brokers'
+LOCATION 'my.company.url.com:2181/topic1'
 TBLPROPERTIES '{
-    "bootstrap.servers":"localhost:9092",
-    "topics": ["topic1", "topic2"],
-    "format": "avro"
-    [, "protoClass": "com.example.ExampleMessage" ]
+    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+    "topics": ["topic2", "topic3"],
+    "format": "json"
 }'
 ```
 
-*   `LOCATION`: The Kafka topic URL.
+#### Nested mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
+  event_timestamp DATETIME,
+  message_key BYTES,
+  headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+  payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE kafka
+LOCATION 'my.company.url.com:2181/topic1'
+TBLPROPERTIES '{
+    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+    "topics": ["topic2", "topic3"],
+    "format": "json"
+}'
+```
+
+The presence of the `headers` field triggers nested mode usage.
+
+*   `LOCATION`: A url with the initial bootstrap broker to use and the initial
+    topic name provided as the path.
 *   `TBLPROPERTIES`:
-    *   `bootstrap.servers`: Optional. Allows you to specify the bootstrap
-        server.
-    *   `topics`: Optional. Allows you to specify specific topics.
+    *   `bootstrap_servers`: Optional. Allows you to specify additional
+        bootstrap servers, which are used in addition to the one in `LOCATION`.
+    *   `topics`: Optional. Allows you to specify additional topics, which are
+        used in addition to the one in `LOCATION`.
     *   `format`: Optional. Allows you to specify the Kafka values format. Possible values are
-        {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv`.
-    *   `protoClass`: Optional. Use only when `format` is equal to `proto`. Allows you to
-        specify full protocol buffer java class name.
-    *   `thriftClass`: Optional. Use only when `format` is equal to `thrift`. Allows you to
-        specify full thrift java class name.
-    *   `thriftProtocolFactoryClass`: Optional. Use only when `format` is equal to `thrift`.
-        Allows you to specify full class name of the `TProtocolFactory` to use for thrift
-        serialization.
+        {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv` in
+        flattened mode or `json` in nested mode. `csv` does not support nested
+        mode.
 
 ### Read Mode
 
@@ -473,18 +542,8 @@
 *   CSV (default)
     *   Beam parses the messages, attempting to parse fields according to the
         types specified in the schema.
-*   Avro
-    *   An Avro schema is automatically generated from the specified field
-        types. It is used to parse incoming messages and to format outgoing
-        messages.
-*   JSON Objects
-    *   Beam attempts to parse JSON to match the schema.
-*   Protocol buffers
-    *   Fields in the schema have to match the fields of the given `protoClass`.
-*   Thrift
-    *   Fields in the schema have to match the fields of the given `thriftClass`.
-    *   The `TProtocolFactory` used for thrift serialization must match the
-        provided `thriftProtocolFactoryClass`.
+*   Kafka supports all [Generic Payload Handling](#generic-payload-handling)
+    formats.
 
 ### Schema
 
@@ -582,3 +641,47 @@
 TYPE text
 LOCATION '/home/admin/orders'
 ```
+
+## Generic Payload Handling
+
+Certain data sources and sinks support generic payload handling. This handling
+parses a byte array payload field into a table schema. The following schemas are
+supported by this handling. All require at least setting `"format": "<type>"`,
+and may require other properties.
+
+*   `avro`: Avro
+    *   An Avro schema is automatically generated from the specified field
+        types. It is used to parse incoming messages and to format outgoing
+        messages.
+*   `json`: JSON Objects
+    *   Beam attempts to parse the byte array as UTF-8 JSON to match the schema.
+*   `proto`: Protocol Buffers
+    *   Beam locates the equivalent Protocol Buffer class and uses it to parse
+        the payload
+    *   `protoClass`: Required. The proto class name to use. Must be built into
+         the deployed JAR.
+    *   Fields in the schema have to match the fields of the given `protoClass`.
+*   `thrift`: Thrift
+    *   Fields in the schema have to match the fields of the given
+        `thriftClass`.
+    *   `thriftClass`: Required. Allows you to specify full thrift java class
+        name. Must be built into the deployed JAR.
+    *   `thriftProtocolFactoryClass`: Required. Allows you to specify full class
+        name of the `TProtocolFactory` to use for thrift serialization. Must be
+        built into the deployed JAR.
+    *   The `TProtocolFactory` used for thrift serialization must match the
+        provided `thriftProtocolFactoryClass`.
+
+## Generic DLQ Handling
+
+Sources and sinks which support generic DLQ handling specify a parameter with
+the format `"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"`. The following types of
+DLQ handling are supported:
+
+*   `bigquery`: BigQuery
+    *   DLQ_ID is the table spec for an output table with an "error" string
+        field and "payload" byte array field.
+*   `pubsub`: Pub/Sub Topic
+    *   DLQ_ID is the full path of the Pub/Sub Topic.
+*   `pubsublite`: Pub/Sub Lite Topic
+    *   DLQ_ID is the full path of the Pub/Sub Lite Topic.
\ No newline at end of file