Merge pull request #31562: Cherrypick #31550 and #31602 onto release branch

diff --git a/CHANGES.md b/CHANGES.md
index 1aee828..bc522ae 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -97,6 +97,9 @@
   * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
   * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
     should be updated to use new snake_case parameter names.
+* Upgraded Jackson Databind to 2.15.4 (Java) ([#26743](https://github.com/apache/beam/issues/26743)).
+  jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
+  If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.
 
 ## Deprecations
 
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 7835ffa..aec6903 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -263,7 +263,8 @@
 	"TestSetStateClear",
 	"TestSetState",
 
-	"TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported.
+	"TestTimers_EventTime_Unbounded",     // Side inputs in executable stage not supported.
+	"TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream.
 }
 
 var dataflowFilters = []string{
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
index 6538a14..408143f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java
@@ -34,6 +34,38 @@
 @Internal
 public class RowJsonUtils {
 
+  //
+  private static int defaultBufferLimit;
+
+  /**
+   * Increase the default jackson-databind stream read constraint.
+   *
+   * <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
+   * parsing failure. This has caused regressions in its dependencies include Beam. Here we
+   * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
+   * If needed, call this method during pipeline run time, e.g. in DoFn.setup.
+   */
+  public static void increaseDefaultStreamReadConstraints(int newLimit) {
+    if (newLimit <= defaultBufferLimit) {
+      return;
+    }
+    try {
+      Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");
+
+      com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
+          com.fasterxml.jackson.core.StreamReadConstraints.builder()
+              .maxStringLength(newLimit)
+              .build());
+    } catch (ClassNotFoundException e) {
+      // <2.15, do nothing
+    }
+    defaultBufferLimit = newLimit;
+  }
+
+  static {
+    increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
+  }
+
   public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
     SimpleModule module = new SimpleModule("rowDeserializationModule");
     module.addDeserializer(Row.class, deserializer);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 9b80c0b..8cf3eeb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -28,6 +28,7 @@
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.RowJsonUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */
@@ -69,15 +70,22 @@
 
   // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
   // TableRow.
-  private static final ObjectMapper MAPPER =
-      new ObjectMapper()
-          .registerModule(new JavaTimeModule())
-          .registerModule(new JodaModule())
-          .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
-          .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+  private static final ObjectMapper MAPPER;;
+  private static final TableRowJsonCoder INSTANCE;
+  private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;
 
-  private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
-  private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
+  static {
+    RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
+
+    MAPPER =
+        new ObjectMapper()
+            .registerModule(new JavaTimeModule())
+            .registerModule(new JodaModule())
+            .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+            .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+    INSTANCE = new TableRowJsonCoder();
+    TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
+  }
 
   private TableRowJsonCoder() {}
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
index b098320..9e767be 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoderTest.java
@@ -26,6 +26,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -67,6 +68,13 @@
     }
   }
 
+  @Test
+  public void testLargeRow() throws Exception {
+    String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB
+    TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build();
+    CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue);
+  }
+
   /**
    * Generated data to check that the wire format has not changed. To regenerate, see {@link
    * org.apache.beam.sdk.coders.PrintBase64Encodings}.
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 3dd6bdb..2f9de24 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -452,7 +452,7 @@
 class _GrpcDataChannel(DataChannel):
   """Base class for implementing a BeamFnData-based DataChannel."""
 
-  _WRITES_FINISHED = object()
+  _WRITES_FINISHED = beam_fn_api_pb2.Elements.Data()
 
   def __init__(self, data_buffer_time_limit_ms=0):
     # type: (int) -> None
@@ -475,7 +475,7 @@
 
   def close(self):
     # type: () -> None
-    self._to_send.put(self._WRITES_FINISHED)  # type: ignore[arg-type]
+    self._to_send.put(self._WRITES_FINISHED)
     self._closed = True
 
   def wait(self, timeout=None):
@@ -639,8 +639,12 @@
       streams = [self._to_send.get()]
       try:
         # Coalesce up to 100 other items.
-        for _ in range(100):
-          streams.append(self._to_send.get_nowait())
+        total_size_bytes = streams[0].ByteSize()
+        while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and
+               len(streams) <= 100):
+          data_or_timer = self._to_send.get_nowait()
+          total_size_bytes += data_or_timer.ByteSize()
+          streams.append(data_or_timer)
       except queue.Empty:
         pass
       if streams[-1] is self._WRITES_FINISHED: