Merge pull request #31562: Cherrypick #31550 and #31602 onto release branch
diff --git a/CHANGES.md b/CHANGES.md
index 1aee828..bc522ae 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -97,6 +97,9 @@
* Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
* All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
should be updated to use new snake_case parameter names.
+* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)).
+ jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
+ If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.
## Deprecations
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 7835ffa..aec6903 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -263,7 +263,8 @@
"TestSetStateClear",
"TestSetState",
- "TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported.
+ "TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported.
+ "TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream.
}
var dataflowFilters = []string{
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
index 6538a14..408143f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
@@ -34,6 +34,38 @@
@Internal
public class RowJsonUtils {
+ //
+ private static int defaultBufferLimit;
+
+ /**
+ * Increase the default jackson-databind stream read constraint.
+ *
+ * <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
+ * parsing failure. This has caused regressions in its dependencies include Beam. Here we
+ * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
+ * If needed, call this method during pipeline run time, e.g. in DoFn.setup.
+ */
+ public static void increaseDefaultStreamReadConstraints(int newLimit) {
+ if (newLimit <= defaultBufferLimit) {
+ return;
+ }
+ try {
+ Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");
+
+ com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
+ com.fasterxml.jackson.core.StreamReadConstraints.builder()
+ .maxStringLength(newLimit)
+ .build());
+ } catch (ClassNotFoundException e) {
+ // <2.15, do nothing
+ }
+ defaultBufferLimit = newLimit;
+ }
+
+ static {
+ increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
+ }
+
public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
SimpleModule module = new SimpleModule("rowDeserializationModule");
module.addDeserializer(Row.class, deserializer);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 9b80c0b..8cf3eeb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
/** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */
@@ -69,15 +70,22 @@
// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
- private static final ObjectMapper MAPPER =
- new ObjectMapper()
- .registerModule(new JavaTimeModule())
- .registerModule(new JodaModule())
- .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
- .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ private static final ObjectMapper MAPPER;;
+ private static final TableRowJsonCoder INSTANCE;
+ private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;
- private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
- private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
+ static {
+ RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
+
+ MAPPER =
+ new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .registerModule(new JodaModule())
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ INSTANCE = new TableRowJsonCoder();
+ TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
+ }
private TableRowJsonCoder() {}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
index b098320..9e767be 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -67,6 +68,13 @@
}
}
+ @Test
+ public void testLargeRow() throws Exception {
+ String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB
+ TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build();
+ CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue);
+ }
+
/**
* Generated data to check that the wire format has not changed. To regenerate, see {@link
* org.apache.beam.sdk.coders.PrintBase64Encodings}.
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 3dd6bdb..2f9de24 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -452,7 +452,7 @@
class _GrpcDataChannel(DataChannel):
"""Base class for implementing a BeamFnData-based DataChannel."""
- _WRITES_FINISHED = object()
+ _WRITES_FINISHED = beam_fn_api_pb2.Elements.Data()
def __init__(self, data_buffer_time_limit_ms=0):
# type: (int) -> None
@@ -475,7 +475,7 @@
def close(self):
# type: () -> None
- self._to_send.put(self._WRITES_FINISHED) # type: ignore[arg-type]
+ self._to_send.put(self._WRITES_FINISHED)
self._closed = True
def wait(self, timeout=None):
@@ -639,8 +639,12 @@
streams = [self._to_send.get()]
try:
# Coalesce up to 100 other items.
- for _ in range(100):
- streams.append(self._to_send.get_nowait())
+ total_size_bytes = streams[0].ByteSize()
+ while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and
+ len(streams) <= 100):
+ data_or_timer = self._to_send.get_nowait()
+ total_size_bytes += data_or_timer.ByteSize()
+ streams.append(data_or_timer)
except queue.Empty:
pass
if streams[-1] is self._WRITES_FINISHED: