Merge pull request #12441: [BEAM-10470] Handle null state from waitUntilFinish

diff --git a/CHANGES.md b/CHANGES.md
index 0d29362..04f3094 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,9 @@
 
 * New overloads for BigtableIO.Read.withKeyRange() and BigtableIO.Read.withRowFilter()
   methods that take ValueProvider as a parameter (Java) ([BEAM-10283](https://issues.apache.org/jira/browse/BEAM-10283)).
+* The WriteToBigQuery transform (Python) in Dataflow Batch no longer relies on BigQuerySink by default. It relies on 
+  a new, fully-featured transform based on file loads into BigQuery. To revert the behavior to the old implementation,
+  you may use `--experiments=use_legacy_bq_sink`.
 * Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
 * Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
 * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3dee824..dfa2b37 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -19,6 +19,7 @@
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
 import com.google.api.client.http.HttpRequestInitializer;
@@ -91,6 +92,7 @@
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -119,6 +121,9 @@
   private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
       FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF);
 
+  // The error code for quota exceeded error (https://cloud.google.com/bigquery/docs/error-messages)
+  private static final String QUOTA_EXCEEDED = "quotaExceeded";
+
   @Override
   public JobService getJobService(BigQueryOptions options) {
     return new JobServiceImpl(options);
@@ -813,6 +818,19 @@
                         try {
                           return insert.execute().getInsertErrors();
                         } catch (IOException e) {
+                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+                          if (errorInfo == null) {
+                            throw e;
+                          }
+                          /**
+                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+                           * GoogleCloudDataproc/hadoop-connectors
+                           */
+                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+                            throw e;
+                          }
                           LOG.info(
                               String.format(
                                   "BigQuery insertAll error, retrying: %s",
@@ -922,6 +940,15 @@
           ignoreInsertIds);
     }
 
+    protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
+      if (!(e instanceof GoogleJsonResponseException)) {
+        return null;
+      }
+      GoogleJsonError jsonError = ((GoogleJsonResponseException) e).getDetails();
+      GoogleJsonError.ErrorInfo errorInfo = Iterables.getFirst(jsonError.getErrors(), null);
+      return errorInfo;
+    }
+
     @Override
     public Table patchTableDescription(
         TableReference tableReference, @Nullable String tableDescription)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 413acba..15bfc0a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -27,6 +27,7 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -35,6 +36,7 @@
 import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
 import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpResponseException;
 import com.google.api.client.http.LowLevelHttpResponse;
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.json.Json;
@@ -110,7 +112,6 @@
             return response;
           }
         };
-
     // A mock transport that lets us mock the API responses.
     MockHttpTransport transport =
         new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
@@ -715,11 +716,11 @@
   }
 
   /**
-   * Tests that {@link DatasetServiceImpl#insertAll} retries other non-rate-limited,
+   * Tests that {@link DatasetServiceImpl#insertAll} will not retry other non-rate-limited,
    * non-quota-exceeded attempts.
    */
   @Test
-  public void testInsertOtherRetry() throws Throwable {
+  public void testFailInsertOtherRetry() throws Exception {
     TableReference ref =
         new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
     List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
@@ -733,26 +734,29 @@
     when(response.getContent())
         .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
         .thenReturn(toStream(new TableDataInsertAllResponse()));
-
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(
-        ref,
-        rows,
-        null,
-        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
-        TEST_BACKOFF,
-        new MockSleeper(),
-        InsertRetryPolicy.alwaysRetry(),
-        null,
-        null,
-        false,
-        false,
-        false);
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-    expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("actually forbidden");
+    try {
+      dataService.insertAll(
+          ref,
+          rows,
+          null,
+          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+          TEST_BACKOFF,
+          new MockSleeper(),
+          InsertRetryPolicy.alwaysRetry(),
+          null,
+          null,
+          false,
+          false,
+          false);
+    } finally {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+    }
   }
 
   /**
@@ -919,6 +923,23 @@
   }
 
   @Test
+  public void testGetErrorInfo() throws IOException {
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    ErrorInfo info = new ErrorInfo();
+    List<ErrorInfo> infoList = new ArrayList<>();
+    infoList.add(info);
+    info.setReason("QuotaExceeded");
+    GoogleJsonError error = new GoogleJsonError();
+    error.setErrors(infoList);
+    HttpResponseException.Builder builder = mock(HttpResponseException.Builder.class);
+    IOException validException = new GoogleJsonResponseException(builder, error);
+    IOException invalidException = new IOException();
+    assertEquals(info.getReason(), dataService.getErrorInfo(validException).getReason());
+    assertNull(dataService.getErrorInfo(invalidException));
+  }
+
+  @Test
   public void testCreateTableSucceeds() throws IOException {
     TableReference ref =
         new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 1f39928..c2dc3cd 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -174,7 +174,6 @@
         'use_standard_sql': False,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
         'on_success_matcher': all_of(*pipeline_verifiers),
-        'experiments': 'use_beam_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -198,7 +197,6 @@
         'use_standard_sql': True,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
         'on_success_matcher': all_of(*pipeline_verifiers),
-        'experiments': 'use_beam_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -227,6 +225,7 @@
         'on_success_matcher': all_of(*pipeline_verifiers),
         'kms_key': kms_key,
         'native': True,
+        'experiments': 'use_legacy_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -281,7 +280,6 @@
         'use_standard_sql': False,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
         'on_success_matcher': all_of(*pipeline_verifiers),
-        'experiments': 'use_beam_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -305,7 +303,8 @@
         'use_standard_sql': False,
         'native': True,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
-        'on_success_matcher': all_of(*pipeline_verifiers)
+        'on_success_matcher': all_of(*pipeline_verifiers),
+        'experiments': 'use_legacy_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 2656a84..d011394 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1488,9 +1488,7 @@
   def _compute_method(self, experiments, is_streaming_pipeline):
     # If the new BQ sink is not activated for experiment flags, then we use
     # streaming inserts by default (it gets overridden in dataflow_runner.py).
-    if 'use_beam_bq_sink' not in experiments:
-      return self.Method.STREAMING_INSERTS
-    elif self.method == self.Method.DEFAULT and is_streaming_pipeline:
+    if self.method == self.Method.DEFAULT and is_streaming_pipeline:
       return self.Method.STREAMING_INSERTS
     elif self.method == self.Method.DEFAULT and not is_streaming_pipeline:
       return self.Method.FILE_LOADS
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index f9e0212..fca7d9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -673,8 +673,7 @@
     ]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(*pipeline_verifiers),
-        experiments='use_beam_bq_sink')
+        on_success_matcher=all_of(*pipeline_verifiers))
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create(_ELEMENTS, reshuffle=False)
@@ -733,9 +732,7 @@
         data=[(i, ) for i in range(100)])
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(state_matcher, bq_matcher),
-        experiments='use_beam_bq_sink',
-        streaming=True)
+        on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True)
     with beam.Pipeline(argv=args) as p:
       stream_source = (
           TestStream().advance_watermark_to(0).advance_processing_time(
@@ -790,8 +787,7 @@
             data=[])
     ]
 
-    args = self.test_pipeline.get_full_options_as_args(
-        experiments='use_beam_bq_sink')
+    args = self.test_pipeline.get_full_options_as_args()
 
     with self.assertRaises(Exception):
       # The pipeline below fails because neither a schema nor SCHEMA_AUTODETECT
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 5c05978..b341dc6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -563,8 +563,7 @@
     self.assertEqual(expected_dict_schema, dict_schema)
 
   def test_schema_autodetect_not_allowed_with_avro_file_loads(self):
-    with TestPipeline(
-        additional_pipeline_args=["--experiments=use_beam_bq_sink"]) as p:
+    with TestPipeline() as p:
       pc = p | beam.Impulse()
 
       with self.assertRaisesRegex(ValueError, '^A schema must be provided'):
@@ -593,8 +592,7 @@
     """
     FULL_OUTPUT_TABLE = 'test_project:output_table'
 
-    p = TestPipeline(
-        additional_pipeline_args=["--experiments=use_beam_bq_sink"])
+    p = TestPipeline()
 
     # Used for testing side input parameters.
     table_record_pcv = beam.pvalue.AsDict(
@@ -886,8 +884,7 @@
     ]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers),
-        experiments='use_beam_bq_sink')
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
@@ -967,8 +964,7 @@
       ]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers),
-        experiments='use_beam_bq_sink')
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
 
     with beam.Pipeline(argv=args) as p:
       if streaming:
@@ -1083,7 +1079,6 @@
     args = self.test_pipeline.get_full_options_as_args(
         on_success_matcher=hc.all_of(*matchers),
         wait_until_finish_duration=self.WAIT_UNTIL_FINISH_DURATION,
-        experiments='use_beam_bq_sink',
         streaming=True)
 
     def add_schema_info(element):
@@ -1182,7 +1177,6 @@
 
     args = self.test_pipeline.get_full_options_as_args(
         on_success_matcher=hc.all_of(*pipeline_verifiers),
-        experiments='use_beam_bq_sink',
     )
 
     with beam.Pipeline(argv=args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 9ea75ea..a5c1ce7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -197,8 +197,7 @@
     ]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers),
-        experiments='use_beam_bq_sink')
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
 
     with beam.Pipeline(argv=args) as p:
       # pylint: disable=expression-not-assigned
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 8f2a9cf..dca3a39 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -592,15 +592,13 @@
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
-    # set default beam_fn_api and use_beam_bq_sink experiment if use unified
+    # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
     debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
         debug_options.add_experiment('beam_fn_api')
-      if not debug_options.lookup_experiment('use_beam_bq_sink'):
-        debug_options.add_experiment('use_beam_bq_sink')
 
   def _get_typehint_based_encoding(self, typehint, window_coder):
     """Returns an encoding based on a typehint object."""
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 1e4f6f9..0315af3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -698,6 +698,7 @@
   def test_write_bigquery_translation(self):
     runner = DataflowRunner()
 
+    self.default_properties.append('--experiments=use_legacy_bq_sink')
     with beam.Pipeline(runner=runner,
                        options=PipelineOptions(self.default_properties)) as p:
       # pylint: disable=expression-not-assigned
@@ -749,12 +750,13 @@
     """Tests that WriteToBigQuery cannot have any consumers if replaced."""
     runner = DataflowRunner()
 
-    with self.assertRaises(ValueError):
+    self.default_properties.append('--experiments=use_legacy_bq_sink')
+    with self.assertRaises(Exception):
       with beam.Pipeline(runner=runner,
                          options=PipelineOptions(self.default_properties)) as p:
         # pylint: disable=expression-not-assigned
         out = p | beam.Create([1]) | beam.io.WriteToBigQuery('some.table')
-        out['FailedRows'] | 'MyTransform' >> beam.Map(lambda _: _)
+        out['destination_file_pairs'] | 'MyTransform' >> beam.Map(lambda _: _)
 
 
 class CustomMergingWindowFn(window.WindowFn):
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 75b1db0..5743f52 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -236,7 +236,10 @@
         self.visit_transform(transform_node)
 
       def visit_transform(self, transform_node):
-        if [o for o in self.outputs if o in transform_node.inputs]:
+        # Internal consumers of the outputs we're overriding are expected.
+        # We only error out on non-internal consumers.
+        if ('BigQueryBatchFileLoads' not in transform_node.full_label and
+            [o for o in self.outputs if o in transform_node.inputs]):
           raise ValueError(
               'WriteToBigQuery was being replaced with the native '
               'BigQuerySink, but the transform "{}" has an input which will be '
@@ -250,16 +253,13 @@
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam import io
-    from apache_beam.runners.dataflow.internal import apiclient
-
     transform = applied_ptransform.transform
     if (not isinstance(transform, io.WriteToBigQuery) or
         getattr(transform, 'override', False)):
       return False
 
-    use_fnapi = apiclient._use_fnapi(self.options)
     experiments = self.options.view_as(DebugOptions).experiments or []
-    if (use_fnapi or 'use_beam_bq_sink' in experiments):
+    if 'use_legacy_bq_sink' not in experiments:
       return False
 
     if transform.schema == io.gcp.bigquery.SCHEMA_AUTODETECT:
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
index a25aba0..2c84f80 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
@@ -24,6 +24,8 @@
 
 import threading
 
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileHeader
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
 from apache_beam.runners.interactive import interactive_environment as ie
 
 
@@ -36,6 +38,20 @@
     raise NotImplementedError
 
 
+class ElementLimiter(Limiter):
+  """A `Limiter` that limits reading from cache based on some property of an
+  element.
+  """
+  def update(self, e):
+    # type: (Any) -> None
+
+    """Update the internal state based on some property of an element.
+
+    This is executed on every element that is read from cache.
+    """
+    raise NotImplementedError
+
+
 class SizeLimiter(Limiter):
   """Limits the cache size to a specified byte limit."""
   def __init__(
@@ -71,3 +87,56 @@
 
   def is_triggered(self):
     return self._triggered
+
+
+class CountLimiter(ElementLimiter):
+  """Limits by counting the number of elements seen."""
+  def __init__(self, max_count):
+    self._max_count = max_count
+    self._count = 0
+
+  def update(self, e):
+    # A TestStreamFileRecord can contain many elements at once. If e is a file
+    # record, then count the number of elements in the bundle.
+    if isinstance(e, TestStreamFileRecord):
+      if not e.recorded_event.element_event:
+        return
+      self._count += len(e.recorded_event.element_event.elements)
+
+    # Otherwise, count everything else but the header of the file since it is
+    # not an element.
+    elif not isinstance(e, TestStreamFileHeader):
+      self._count += 1
+
+  def is_triggered(self):
+    return self._count >= self._max_count
+
+
+class ProcessingTimeLimiter(ElementLimiter):
+  """Limits by how long the ProcessingTime passed in the element stream.
+
+  This measures the duration from the first element in the stream. Each
+  subsequent element has a delta "advance_duration" that moves the internal
+  clock forward. This triggers when the duration from the internal clock and
+  the start exceeds the given duration.
+  """
+  def __init__(self, max_duration_secs):
+    """Initialize the ProcessingTimeLimiter."""
+    self._max_duration_us = max_duration_secs * 1e6
+    self._start_us = 0
+    self._cur_time_us = 0
+
+  def update(self, e):
+    # Only look at TestStreamFileRecords which hold the processing time.
+    if not isinstance(e, TestStreamFileRecord):
+      return
+
+    if not e.recorded_event.processing_time_event:
+      return
+
+    if self._start_us == 0:
+      self._start_us = e.recorded_event.processing_time_event.advance_duration
+    self._cur_time_us += e.recorded_event.processing_time_event.advance_duration
+
+  def is_triggered(self):
+    return self._cur_time_us - self._start_us >= self._max_duration_us
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
new file mode 100644
index 0000000..850c56e2c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import unittest
+
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+
+class CaptureLimitersTest(unittest.TestCase):
+  def test_count_limiter(self):
+    limiter = CountLimiter(5)
+
+    for e in range(4):
+      limiter.update(e)
+
+    self.assertFalse(limiter.is_triggered())
+    limiter.update(5)
+    self.assertTrue(limiter.is_triggered())
+
+  def test_processing_time_limiter(self):
+    limiter = ProcessingTimeLimiter(max_duration_secs=2)
+
+    r = TestStreamFileRecord()
+    r.recorded_event.processing_time_event.advance_duration = int(1 * 1e6)
+    limiter.update(r)
+    self.assertFalse(limiter.is_triggered())
+
+    r = TestStreamFileRecord()
+    r.recorded_event.processing_time_event.advance_duration = int(2 * 1e6)
+    limiter.update(r)
+    self.assertTrue(limiter.is_triggered())
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/website/www/site/content/en/documentation/dsls/sql/walkthrough.md b/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
index a02e14c..59ecb82 100644
--- a/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
+++ b/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
@@ -129,7 +129,7 @@
           .builder()
           .addInt32Field("appId")
           .addInt32Field("reviewerId")
-          .withFloatField("rating")
+          .addFloatField("rating")
           .addDateTimeField("rowtime")
           .build();
     
diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md b/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
index 7d10eb0..33fc8f6 100644
--- a/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
+++ b/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
@@ -14,24 +14,62 @@
 See the License for the specific language governing permissions and
 limitations under the License.
 -->
-# Sample
-<table align="left">
-    <a target="_blank" class="button"
-        href="https://beam.apache.org/releases/pydoc/2.12.0/apache_beam.transforms.combiners.html?#apache_beam.transforms.combiners.Sample">
-      <img src="https://beam.apache.org/images/logos/sdks/python.png" width="20px" height="20px"
-           alt="Pydoc" />
-     Pydoc
-    </a>
-</table>
-<br><br>
 
+# Sample
+
+{{< localstorage language language-py >}}
+
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Sample" >}}
 
 Transforms for taking samples of the elements in a collection, or
-samples of the values associated with each key in a collection of 
+samples of the values associated with each key in a collection of
 key-value pairs.
 
 ## Examples
-See [BEAM-7390](https://issues.apache.org/jira/browse/BEAM-7390) for updates. 
 
-## Related transforms 
-* [Top](/documentation/transforms/python/aggregation/top) finds the largest or smallest element.
\ No newline at end of file
+In the following example, we create a pipeline with a `PCollection`.
+Then, we get a random sample of elements in different ways.
+
+### Example 1: Sample elements from a PCollection
+
+We use `Sample.FixedSizeGlobally()` to get a fixed-size random sample of elements from the *entire* `PCollection`.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" sample_fixed_size_globally >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py" sample >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" >}}
+
+### Example 2: Sample elements for each key
+
+We use `Sample.FixedSizePerKey()` to get fixed-size random samples for each unique key in a `PCollection` of key-values.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" sample_fixed_size_per_key >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py" samples_per_key >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" >}}
+
+## Related transforms
+
+* [Top](/documentation/transforms/python/aggregation/top) finds the largest or smallest element.
+
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Sample" >}}