Merge pull request #12441: [BEAM-10470] Handle null state from waitUntilFinish
diff --git a/CHANGES.md b/CHANGES.md
index 0d29362..04f3094 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,9 @@
* New overloads for BigtableIO.Read.withKeyRange() and BigtableIO.Read.withRowFilter()
methods that take ValueProvider as a parameter (Java) ([BEAM-10283](https://issues.apache.org/jira/browse/BEAM-10283)).
+* The WriteToBigQuery transform (Python) in Dataflow Batch no longer relies on BigQuerySink by default. It relies on
+ a new, fully-featured transform based on file loads into BigQuery. To revert the behavior to the old implementation,
+ you may use `--experiments=use_legacy_bq_sink`.
* Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
* Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3dee824..dfa2b37 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -19,6 +19,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.http.HttpRequestInitializer;
@@ -91,6 +92,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -119,6 +121,9 @@
private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF);
+ // The error code for quota exceeded error (https://cloud.google.com/bigquery/docs/error-messages)
+ private static final String QUOTA_EXCEEDED = "quotaExceeded";
+
@Override
public JobService getJobService(BigQueryOptions options) {
return new JobServiceImpl(options);
@@ -813,6 +818,19 @@
try {
return insert.execute().getInsertErrors();
} catch (IOException e) {
+ GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+ if (errorInfo == null) {
+ throw e;
+ }
+ /**
+ * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+ * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+ * GoogleCloudDataproc/hadoop-connectors
+ */
+ if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+ && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+ throw e;
+ }
LOG.info(
String.format(
"BigQuery insertAll error, retrying: %s",
@@ -922,6 +940,15 @@
ignoreInsertIds);
}
+ protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
+ if (!(e instanceof GoogleJsonResponseException)) {
+ return null;
+ }
+ GoogleJsonError jsonError = ((GoogleJsonResponseException) e).getDetails();
+ GoogleJsonError.ErrorInfo errorInfo = Iterables.getFirst(jsonError.getErrors(), null);
+ return errorInfo;
+ }
+
@Override
public Table patchTableDescription(
TableReference tableReference, @Nullable String tableDescription)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 413acba..15bfc0a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -27,6 +27,7 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -35,6 +36,7 @@
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.Json;
@@ -110,7 +112,6 @@
return response;
}
};
-
// A mock transport that lets us mock the API responses.
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
@@ -715,11 +716,11 @@
}
/**
- * Tests that {@link DatasetServiceImpl#insertAll} retries other non-rate-limited,
+ * Tests that {@link DatasetServiceImpl#insertAll} will not retry other non-rate-limited,
* non-quota-exceeded attempts.
*/
@Test
- public void testInsertOtherRetry() throws Throwable {
+ public void testFailInsertOtherRetry() throws Exception {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
@@ -733,26 +734,29 @@
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
.thenReturn(toStream(new TableDataInsertAllResponse()));
-
DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
- dataService.insertAll(
- ref,
- rows,
- null,
- BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
- TEST_BACKOFF,
- new MockSleeper(),
- InsertRetryPolicy.alwaysRetry(),
- null,
- null,
- false,
- false,
- false);
- verify(response, times(2)).getStatusCode();
- verify(response, times(2)).getContent();
- verify(response, times(2)).getContentType();
- expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("actually forbidden");
+ try {
+ dataService.insertAll(
+ ref,
+ rows,
+ null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false,
+ false);
+ } finally {
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ }
}
/**
@@ -919,6 +923,23 @@
}
@Test
+ public void testGetErrorInfo() throws IOException {
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ ErrorInfo info = new ErrorInfo();
+ List<ErrorInfo> infoList = new ArrayList<>();
+ infoList.add(info);
+ info.setReason("QuotaExceeded");
+ GoogleJsonError error = new GoogleJsonError();
+ error.setErrors(infoList);
+ HttpResponseException.Builder builder = mock(HttpResponseException.Builder.class);
+ IOException validException = new GoogleJsonResponseException(builder, error);
+ IOException invalidException = new IOException();
+ assertEquals(info.getReason(), dataService.getErrorInfo(validException).getReason());
+ assertNull(dataService.getErrorInfo(invalidException));
+ }
+
+ @Test
public void testCreateTableSucceeds() throws IOException {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 1f39928..c2dc3cd 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -174,7 +174,6 @@
'use_standard_sql': False,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
'on_success_matcher': all_of(*pipeline_verifiers),
- 'experiments': 'use_beam_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -198,7 +197,6 @@
'use_standard_sql': True,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
'on_success_matcher': all_of(*pipeline_verifiers),
- 'experiments': 'use_beam_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -227,6 +225,7 @@
'on_success_matcher': all_of(*pipeline_verifiers),
'kms_key': kms_key,
'native': True,
+ 'experiments': 'use_legacy_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -281,7 +280,6 @@
'use_standard_sql': False,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
'on_success_matcher': all_of(*pipeline_verifiers),
- 'experiments': 'use_beam_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -305,7 +303,8 @@
'use_standard_sql': False,
'native': True,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
- 'on_success_matcher': all_of(*pipeline_verifiers)
+ 'on_success_matcher': all_of(*pipeline_verifiers),
+ 'experiments': 'use_legacy_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 2656a84..d011394 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1488,9 +1488,7 @@
def _compute_method(self, experiments, is_streaming_pipeline):
# If the new BQ sink is not activated for experiment flags, then we use
# streaming inserts by default (it gets overridden in dataflow_runner.py).
- if 'use_beam_bq_sink' not in experiments:
- return self.Method.STREAMING_INSERTS
- elif self.method == self.Method.DEFAULT and is_streaming_pipeline:
+ if self.method == self.Method.DEFAULT and is_streaming_pipeline:
return self.Method.STREAMING_INSERTS
elif self.method == self.Method.DEFAULT and not is_streaming_pipeline:
return self.Method.FILE_LOADS
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index f9e0212..fca7d9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -673,8 +673,7 @@
]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(*pipeline_verifiers),
- experiments='use_beam_bq_sink')
+ on_success_matcher=all_of(*pipeline_verifiers))
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS, reshuffle=False)
@@ -733,9 +732,7 @@
data=[(i, ) for i in range(100)])
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(state_matcher, bq_matcher),
- experiments='use_beam_bq_sink',
- streaming=True)
+ on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True)
with beam.Pipeline(argv=args) as p:
stream_source = (
TestStream().advance_watermark_to(0).advance_processing_time(
@@ -790,8 +787,7 @@
data=[])
]
- args = self.test_pipeline.get_full_options_as_args(
- experiments='use_beam_bq_sink')
+ args = self.test_pipeline.get_full_options_as_args()
with self.assertRaises(Exception):
# The pipeline below fails because neither a schema nor SCHEMA_AUTODETECT
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 5c05978..b341dc6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -563,8 +563,7 @@
self.assertEqual(expected_dict_schema, dict_schema)
def test_schema_autodetect_not_allowed_with_avro_file_loads(self):
- with TestPipeline(
- additional_pipeline_args=["--experiments=use_beam_bq_sink"]) as p:
+ with TestPipeline() as p:
pc = p | beam.Impulse()
with self.assertRaisesRegex(ValueError, '^A schema must be provided'):
@@ -593,8 +592,7 @@
"""
FULL_OUTPUT_TABLE = 'test_project:output_table'
- p = TestPipeline(
- additional_pipeline_args=["--experiments=use_beam_bq_sink"])
+ p = TestPipeline()
# Used for testing side input parameters.
table_record_pcv = beam.pvalue.AsDict(
@@ -886,8 +884,7 @@
]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers),
- experiments='use_beam_bq_sink')
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
with beam.Pipeline(argv=args) as p:
input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
@@ -967,8 +964,7 @@
]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers),
- experiments='use_beam_bq_sink')
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
with beam.Pipeline(argv=args) as p:
if streaming:
@@ -1083,7 +1079,6 @@
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*matchers),
wait_until_finish_duration=self.WAIT_UNTIL_FINISH_DURATION,
- experiments='use_beam_bq_sink',
streaming=True)
def add_schema_info(element):
@@ -1182,7 +1177,6 @@
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers),
- experiments='use_beam_bq_sink',
)
with beam.Pipeline(argv=args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 9ea75ea..a5c1ce7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -197,8 +197,7 @@
]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers),
- experiments='use_beam_bq_sink')
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
with beam.Pipeline(argv=args) as p:
# pylint: disable=expression-not-assigned
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 8f2a9cf..dca3a39 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -592,15 +592,13 @@
return result
def _maybe_add_unified_worker_missing_options(self, options):
- # set default beam_fn_api and use_beam_bq_sink experiment if use unified
+ # set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
debug_options.add_experiment('beam_fn_api')
- if not debug_options.lookup_experiment('use_beam_bq_sink'):
- debug_options.add_experiment('use_beam_bq_sink')
def _get_typehint_based_encoding(self, typehint, window_coder):
"""Returns an encoding based on a typehint object."""
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 1e4f6f9..0315af3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -698,6 +698,7 @@
def test_write_bigquery_translation(self):
runner = DataflowRunner()
+ self.default_properties.append('--experiments=use_legacy_bq_sink')
with beam.Pipeline(runner=runner,
options=PipelineOptions(self.default_properties)) as p:
# pylint: disable=expression-not-assigned
@@ -749,12 +750,13 @@
"""Tests that WriteToBigQuery cannot have any consumers if replaced."""
runner = DataflowRunner()
- with self.assertRaises(ValueError):
+ self.default_properties.append('--experiments=use_legacy_bq_sink')
+ with self.assertRaises(Exception):
with beam.Pipeline(runner=runner,
options=PipelineOptions(self.default_properties)) as p:
# pylint: disable=expression-not-assigned
out = p | beam.Create([1]) | beam.io.WriteToBigQuery('some.table')
- out['FailedRows'] | 'MyTransform' >> beam.Map(lambda _: _)
+ out['destination_file_pairs'] | 'MyTransform' >> beam.Map(lambda _: _)
class CustomMergingWindowFn(window.WindowFn):
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 75b1db0..5743f52 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -236,7 +236,10 @@
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
- if [o for o in self.outputs if o in transform_node.inputs]:
+ # Internal consumers of the outputs we're overriding are expected.
+ # We only error out on non-internal consumers.
+ if ('BigQueryBatchFileLoads' not in transform_node.full_label and
+ [o for o in self.outputs if o in transform_node.inputs]):
raise ValueError(
'WriteToBigQuery was being replaced with the native '
'BigQuerySink, but the transform "{}" has an input which will be '
@@ -250,16 +253,13 @@
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import io
- from apache_beam.runners.dataflow.internal import apiclient
-
transform = applied_ptransform.transform
if (not isinstance(transform, io.WriteToBigQuery) or
getattr(transform, 'override', False)):
return False
- use_fnapi = apiclient._use_fnapi(self.options)
experiments = self.options.view_as(DebugOptions).experiments or []
- if (use_fnapi or 'use_beam_bq_sink' in experiments):
+ if 'use_legacy_bq_sink' not in experiments:
return False
if transform.schema == io.gcp.bigquery.SCHEMA_AUTODETECT:
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
index a25aba0..2c84f80 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
@@ -24,6 +24,8 @@
import threading
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileHeader
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
from apache_beam.runners.interactive import interactive_environment as ie
@@ -36,6 +38,20 @@
raise NotImplementedError
+class ElementLimiter(Limiter):
+ """A `Limiter` that limits reading from cache based on some property of an
+ element.
+ """
+ def update(self, e):
+ # type: (Any) -> None
+
+ """Update the internal state based on some property of an element.
+
+ This is executed on every element that is read from cache.
+ """
+ raise NotImplementedError
+
+
class SizeLimiter(Limiter):
"""Limits the cache size to a specified byte limit."""
def __init__(
@@ -71,3 +87,56 @@
def is_triggered(self):
return self._triggered
+
+
+class CountLimiter(ElementLimiter):
+ """Limits by counting the number of elements seen."""
+ def __init__(self, max_count):
+ self._max_count = max_count
+ self._count = 0
+
+ def update(self, e):
+ # A TestStreamFileRecord can contain many elements at once. If e is a file
+ # record, then count the number of elements in the bundle.
+ if isinstance(e, TestStreamFileRecord):
+ if not e.recorded_event.element_event:
+ return
+ self._count += len(e.recorded_event.element_event.elements)
+
+ # Otherwise, count everything else but the header of the file since it is
+ # not an element.
+ elif not isinstance(e, TestStreamFileHeader):
+ self._count += 1
+
+ def is_triggered(self):
+ return self._count >= self._max_count
+
+
+class ProcessingTimeLimiter(ElementLimiter):
+ """Limits by how long the ProcessingTime passed in the element stream.
+
+ This measures the duration from the first element in the stream. Each
+ subsequent element has a delta "advance_duration" that moves the internal
+ clock forward. This triggers when the duration from the internal clock and
+ the start exceeds the given duration.
+ """
+ def __init__(self, max_duration_secs):
+ """Initialize the ProcessingTimeLimiter."""
+ self._max_duration_us = max_duration_secs * 1e6
+ self._start_us = 0
+ self._cur_time_us = 0
+
+ def update(self, e):
+ # Only look at TestStreamFileRecords which hold the processing time.
+ if not isinstance(e, TestStreamFileRecord):
+ return
+
+ if not e.recorded_event.processing_time_event:
+ return
+
+ if self._start_us == 0:
+ self._start_us = e.recorded_event.processing_time_event.advance_duration
+ self._cur_time_us += e.recorded_event.processing_time_event.advance_duration
+
+ def is_triggered(self):
+ return self._cur_time_us - self._start_us >= self._max_duration_us
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
new file mode 100644
index 0000000..850c56e2c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import unittest
+
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+
+class CaptureLimitersTest(unittest.TestCase):
+ def test_count_limiter(self):
+ limiter = CountLimiter(5)
+
+ for e in range(4):
+ limiter.update(e)
+
+ self.assertFalse(limiter.is_triggered())
+ limiter.update(5)
+ self.assertTrue(limiter.is_triggered())
+
+ def test_processing_time_limiter(self):
+ limiter = ProcessingTimeLimiter(max_duration_secs=2)
+
+ r = TestStreamFileRecord()
+ r.recorded_event.processing_time_event.advance_duration = int(1 * 1e6)
+ limiter.update(r)
+ self.assertFalse(limiter.is_triggered())
+
+ r = TestStreamFileRecord()
+ r.recorded_event.processing_time_event.advance_duration = int(2 * 1e6)
+ limiter.update(r)
+ self.assertTrue(limiter.is_triggered())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/website/www/site/content/en/documentation/dsls/sql/walkthrough.md b/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
index a02e14c..59ecb82 100644
--- a/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
+++ b/website/www/site/content/en/documentation/dsls/sql/walkthrough.md
@@ -129,7 +129,7 @@
.builder()
.addInt32Field("appId")
.addInt32Field("reviewerId")
- .withFloatField("rating")
+ .addFloatField("rating")
.addDateTimeField("rowtime")
.build();
diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md b/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
index 7d10eb0..33fc8f6 100644
--- a/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
+++ b/website/www/site/content/en/documentation/transforms/python/aggregation/sample.md
@@ -14,24 +14,62 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-# Sample
-<table align="left">
- <a target="_blank" class="button"
- href="https://beam.apache.org/releases/pydoc/2.12.0/apache_beam.transforms.combiners.html?#apache_beam.transforms.combiners.Sample">
- <img src="https://beam.apache.org/images/logos/sdks/python.png" width="20px" height="20px"
- alt="Pydoc" />
- Pydoc
- </a>
-</table>
-<br><br>
+# Sample
+
+{{< localstorage language language-py >}}
+
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Sample" >}}
Transforms for taking samples of the elements in a collection, or
-samples of the values associated with each key in a collection of
+samples of the values associated with each key in a collection of
key-value pairs.
## Examples
-See [BEAM-7390](https://issues.apache.org/jira/browse/BEAM-7390) for updates.
-## Related transforms
-* [Top](/documentation/transforms/python/aggregation/top) finds the largest or smallest element.
\ No newline at end of file
+In the following example, we create a pipeline with a `PCollection`.
+Then, we get a random sample of elements in different ways.
+
+### Example 1: Sample elements from a PCollection
+
+We use `Sample.FixedSizeGlobally()` to get a fixed-size random sample of elements from the *entire* `PCollection`.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" sample_fixed_size_globally >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py" sample >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+ py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" >}}
+
+### Example 2: Sample elements for each key
+
+We use `Sample.FixedSizePerKey()` to get fixed-size random samples for each unique key in a `PCollection` of key-values.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" sample_fixed_size_per_key >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py" samples_per_key >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+ py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py" >}}
+
+## Related transforms
+
+* [Top](/documentation/transforms/python/aggregation/top) finds the largest or smallest element.
+
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Sample" >}}